Logstash getting fileds from another event

My Logstash config:
input {
beats {
port => 5002
}
}

filter
{
grok {
add_tag => [ "first" ]
match => { "message" => "". .definitionName": "(?<build_DefinitionName>. ?)"" }
}
grok {
add_tag => [ "first" ]
match => { "message" => "". .requestedFor": "(?<build_RequesterName>. ?)"" }
}
grok {
add_tag => [ "first" ]
match => { "message" => "". .buildNumber": "(?<build_BuildNumber>. ?)"" }
}
grok {
add_tag => [ "first" ]
match => { "message" => "". .teamProject": "(?<build_TeamProject>. ?)"" }
}
grok {
add_tag => [ "first" ]
match => { "message" => "(?<build_ErrorMessage> ERR .*)" }
}
grok {
add_tag => [ "end" ]
match => { "message" => "Current state: job state = '(?<build_IsFailed>.+)'" }
}
if "end" in [tags]{
aggregate {
task_id => "%{source}"
code => "event.set('build_DefinitionName', '%{build_DefinitionName}')
event.set('key', 'value')
map['build_DefinitionName'] ||= event.get('build_DefinitionName')
event.set('build_BuildNumber', map['build_BuildNumber'])
event.set('build_RequesterName', map['build_RequesterName'])
event.set('build_TeamProject', map['build_TeamProject'])
event.set('build_IsFailed', map['build_IsFailed'])
event.set('build_ErrorMessage', map['build_ErrorMessage'])"
map_action => "create_or_update"
push_previous_map_as_event => true
end_of_task => false
}
}
if "first" not in [tags] and "end" not in [tags] {
drop { }
}
mutate {
remove_field => [ "message" ]
}
mutate {
remove_tag => [ "first" ]
}
}

output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "mylog-%{+YYYY.MM.dd}"
}
stdout { codec => rubydebug }
}

Here whenever log matches the above one of the grok then it will create that field and puts in elasticsearch as a separate event. So what we wanted is when the last grok match happened then it should pull fields from previous events and put it as a single event using source as a common filed. How to achieve this?

The above config results in kibana like below, where we don;t have any value for fileds like build_DefinitionName, build_TeamProject etc in the final event,

Hi,

First, some remarks :

  • %{build_DefinitionName} syntax does not work in code. You should use event.get('build_DefinitionName').
  • map object is not auto-filled. It must be filled for yourself, each time you have a "first" event with map['field'] = event.get['field']
  • push_previous_map_as_event is especially useful when you have no "tagged" end event and events are not interlaced. It has been thought primarily for jdbc input case. In your case, it seems that you have an tagged end event. And as you speak about log and metrics, you will probably receive interlaced events, I mean first event with source "source1", then event with source "source2", then again event with "source1". push_previous_map_as_event requires events are sorted per task_id.
  • end_of_task => true is useful to destroy map object (associated to task_id) when you don't need it anymore. It is important so that maps are not stacked in memory forever.
  • finally, if you want that "first" event fields are merged into "end" event, you must not use push* aggregate options, that always create a fresh new event using map object.

That said, so that "first" event fields are merged into "end" event, I suggest this Logstash aggregate configuration :

if "first" in [tags] {
  aggregate {
    task_id => "%{source}"
    code => "
      map['build_DefinitionName'] = event.get('build_DefinitionName') unless event.get('build_DefinitionName').nil?
      map['build_RequesterName'] = event.get('build_RequesterName') unless event.get('build_RequesterName').nil?
      map['build_BuildNumber'] = event.get('build_BuildNumber') unless event.get('build_BuildNumber').nil?
      map['build_TeamProject'] = event.get('build_TeamProject') unless event.get('build_TeamProject').nil?
      map['build_ErrorMessage'] = event.get('build_ErrorMessage') unless event.get('build_ErrorMessage').nil?
    "
  }
}

if "end" in [tags] {
  aggregate {
    task_id => "%{source}"
    code => "
      event.set('build_DefinitionName', map['build_DefinitionName'])
      event.set('build_RequesterName', map['build_RequesterName'])
      event.set('build_BuildNumber', map['build_BuildNumber'])
      event.set('build_TeamProject', map['build_TeamProject'])
      event.set('build_ErrorMessage', map['build_ErrorMessage'])
    "
    end_of_task => true
  }
}

Yes this serves the purpose. Now we will go ahead and implement the ELK monitoring stuff across our organization.
Thanks a lot

1 Like

Nice !

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.