Logstash parent-child event configuration

2016-08-16 14:00:14,655 ABC BETA P1 DecodeFieldList Refresh <6>,TRDPRC_1,Price, <15>,CURRENCY,String,0 <22>,BID,Price, <23>,BID_1,Price, <24>,BID_2,Price, <25>,ASK,Price, <26>,ASK_1,Price, <27>,ASK_2,Price,
Above is my log sample, the first line is parent info and the rest is the children info. I have tested the parent pattern and children pattern via grok debuger as below:

  • Parent pattern

%{DATESTAMP:EventTime},%{NUMBER:Mil:INT} %{WORD:Type} %{GREEDYDATA:Item} %{GREEDYDATA:RIC} %{GREEDYDATA:Detail} %{GREEDYDATA:Category}

  • Children pattern
    \<%{NUMBER:FID:INT}\>,%{GREEDYDATA:FName},%{WORD:FType},%{GREEDYDATA:FValue}

How can i store the sample to two ES types via logstash configuration file

hope your help ! many many thanks

1 Like

solved it!! here is my thinking:

  1. define a pattern for parent event firstly,since the children events will not match the pattern, Logstash will generate a _grokparsefailure tag for children events,then we can identify the current event is parent or child via the tag
  2. use a filter->ruby block to generate document_id and keep it in a global variable,then children events can access it.
    3.add a field such as doc_id for both parent events and children events, which stores the document_id in step 2 and add a field such as parent_id only for children events to store the parent document_id

below is the entire Lostash configuration:

input {
beats {
port => 5044
}
}

filter {

remove the empty lines

if [message] =~ /^\s*$/ {
drop { }
}

define parent event pattern

grok {
match => {"message" => "%{DATESTAMP:EventTime},%{NUMBER:Mil:INT} %{WORD:Type} %{GREEDYDATA:Item} %{GREEDYDATA:RIC} %{GREEDYDATA:Detail} %{GREEDYDATA:Category}"}
}

children events

if "_grokparsefailure" in [tags] {
grok {
match => {"message" => "<%{NUMBER:FID:INT}>,%{GREEDYDATA:FName},%{WORD:FType},%{GREEDYDATA:FValue}"}
add_field => {"DocID" => '' "ParentID" => ''}
add_tag => ["%{FType}"]
remove_tag => ["_grokparsefailure"]
}
ruby {
code => "require 'digest/md5';
event['ParentID'] = @@parentid;
event['DocID'] = Digest::MD5.hexdigest(@@parentdate+event['FID'])"
}
}
else{
mutate {
add_field => {"DocID" => ''}
add_tag => ["parent"]
}
# define a global variable to keep the parent id
# must set the default value for the variables in ruby -> init block, or it will raise exception
ruby {
init => "@@parentid = '';@@parentdate=''"
code => "require 'digest/md5';
@@parentid = Digest::MD5.hexdigest(event['EventTime']+event['Mil']);
event['DocID'] = @@parentid;
@@parentdate = event['EventTime']+event['Mil']"
}
}
#remove the redundant fields created by filebeat. you can ignore it if you don't use filebeat as shipper
mutate {
remove_field => ["[beat][hostname]","[beat][name]","count","fields","input_type","offset","type","beat","@version"]
}
}

output {
elasticsearch {
hosts => ["localhost:9200"]
index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
#set the document_id
document_id => %{"DocID"}
document_type => "%{[@metadata][type]}"
#template => "/appserver/ELK/logstash-2.3.4/conf/template_tolreport.json"
#template_name =>"template_tolreport"
#template_overwrite => true
}

file {

path => "./test-%{+YYYY-MM-dd}.txt"

}

}

at last,thanks for this thread keep global variable in logstash configuration which let me know how use ruby syntax to define the global variable