solved it!! here is my thinking:
- define a pattern for parent event firstly,since the children events will not match the pattern, Logstash will generate a _grokparsefailure tag for children events,then we can identify the current event is parent or child via the tag
- use a filter->ruby block to generate document_id and keep it in a global variable,then children events can access it.
3.add a field such as doc_id for both parent events and children events, which stores the document_id in step 2 and add a field such as parent_id only for children events to store the parent document_id
below is the entire Lostash configuration:
input {
beats {
port => 5044
}
}
filter {
remove the empty lines
if [message] =~ /^\s*$/ {
drop { }
}
define parent event pattern
grok {
match => {"message" => "%{DATESTAMP:EventTime},%{NUMBER:Mil:INT} %{WORD:Type} %{GREEDYDATA:Item} %{GREEDYDATA:RIC} %{GREEDYDATA:Detail} %{GREEDYDATA:Category}"}
}
children events
if "_grokparsefailure" in [tags] {
grok {
match => {"message" => "<%{NUMBER:FID:INT}>,%{GREEDYDATA:FName},%{WORD:FType},%{GREEDYDATA:FValue}"}
add_field => {"DocID" => '' "ParentID" => ''}
add_tag => ["%{FType}"]
remove_tag => ["_grokparsefailure"]
}
ruby {
code => "require 'digest/md5';
event['ParentID'] = @@parentid;
event['DocID'] = Digest::MD5.hexdigest(@@parentdate+event['FID'])"
}
}
else{
mutate {
add_field => {"DocID" => ''}
add_tag => ["parent"]
}
# define a global variable to keep the parent id
# must set the default value for the variables in ruby -> init block, or it will raise exception
ruby {
init => "@@parentid = '';@@parentdate=''"
code => "require 'digest/md5';
@@parentid = Digest::MD5.hexdigest(event['EventTime']+event['Mil']);
event['DocID'] = @@parentid;
@@parentdate = event['EventTime']+event['Mil']"
}
}
#remove the redundant fields created by filebeat. you can ignore it if you don't use filebeat as shipper
mutate {
remove_field => ["[beat][hostname]","[beat][name]","count","fields","input_type","offset","type","beat","@version"]
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
#set the document_id
document_id => %{"DocID"}
document_type => "%{[@metadata][type]}"
#template => "/appserver/ELK/logstash-2.3.4/conf/template_tolreport.json"
#template_name =>"template_tolreport"
#template_overwrite => true
}
file {
path => "./test-%{+YYYY-MM-dd}.txt"
}
}
at last,thanks for this thread keep global variable in logstash configuration which let me know how use ruby syntax to define the global variable