LogStash - Add fields between two unique events (start and stop)

Hi,

I have logs shipping to Logstash via FileBeat, which I'm doing log enrichment on before passing on to Elasticsearch.

One of the enrichments I would like to do is add a field to each event which is between two unique events (a start and stop event). This also would have to take into account a unique ID that applies to the set of documents.

For example, I have logs coming in from multiple sources, and each source has a [Match][ID]. The [Match][ID] remains the same for that set of documents. The above aggregation should apply per [Match][ID]

Example documents...

"message" : "log line of data..."       -- add_field => { "match_in_progress" => false}
"match_id" : "1234"
"message" : "log line of data..."       -- add_field => { "match_in_progress" => false}
"match_id" : "1234"
"message" : "log line of data..."       -- add_field => { "match_in_progress" => false}
"match_id" : "1234"
"message" : "StartMatch"      <----- this indicates my start event
"match_id" : "1234"
"message" : "log line of data..."       -- add_field => { "match_in_progress" => true }
"match_id" : "1234"
"message" : "log line of data..."       -- add_field => { "match_in_progress" => true }
"match_id" : "1234"
"message" : "log line of data..."       -- add_field => { "match_in_progress" => true }
"match_id" : "1234"
.. maybe 2 thousand more of these       -- add_field => { "match_in_progress" => true }
"match_id" : "1234"
"message" : "EndMatch"       <----- this indicates my stop event
"message" : "log line of data..."       -- add_field => { "match_in_progress" => false}
"match_id" : "1234"

There will be hundreds of different types of [Match][ID] occurring at once.

Is it possible to do what I'm trying to achieve within LogStash with for example, the Aggregate pluggin? Or perhaps the elasticsearch pluggin?

I have tried, and I've found it's not fitting so far, or I just don't know how to use it properly. It seems to be more tailored for adding information at the final event?

I have also tried custom ruby code, like so:

      if [message] == "StartMatch" {
        #start
        mutate {
          add_tag => [ "match_started" ]
        }
        ruby {
          init => "@@started = ''"
          code => "@@started = true"
        }
      }
      else if "EndMatch" in [message] {
        #end
        mutate {
          add_tag => [ "match_ended" ]
        }
        ruby {
          code => "@@started = false"
        }
      }
      else {
        ruby {
          code => "if @@started == true
                               event.tag('match_in_progress')"
        }
      }

Obviously the above does not work, because I have such a variety of different logs and [Match][ID] coming in.

Even if I set pipeline workers to 1, I don't think this helps because there's still multiple different [Match][ID] sources coming in at once. The data ends up all over the place.

For anyone wondering I solved this with aggregation after all.

Not sure if it will work perfectly when I'm having different logs with different [Match][ID], but here's the code

Apologies for my example up higher where I refer to [Match][ID] as [match_id]. they should all be [Match][ID]

if [message] == "StartMatch" {
        aggregate {
          task_id => "%{[Match][ID]}"
          code => "event.set('[Match][In_Progress]',true)"
          map_action => "create"
          add_tag => "match_started"
        }
      }
      else if "EndGame" in [message] {
        aggregate {
          task_id => "%{[Match][ID]}"
          code => "event.set('[Match][In_Progress]',false)"
          map_action => "update"
          add_tag => "match_ended"
          end_of_task => true
          timeout => 5400
          timeout_tags => ["match_not_ended"]
        }
      } else {
        aggregate {
          task_id => "%{[Match][ID]}"
          code => "event.set('[Match][In_Progress]',true)"
          map_action => "update"
        }
      }
1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.