Hi,
I have logs shipping to Logstash via FileBeat, which I'm doing log enrichment on before passing on to Elasticsearch.
One of the enrichments I would like to do is add a field to each event which is between two unique events (a start and stop event). This also would have to take into account a unique ID that applies to the set of documents.
For example, I have logs coming in from multiple sources, and each source has a [Match][ID]
. The [Match][ID]
remains the same for that set of documents. The above aggregation should apply per [Match][ID]
Example documents...
"message" : "log line of data..." -- add_field => { "match_in_progress" => false}
"match_id" : "1234"
"message" : "log line of data..." -- add_field => { "match_in_progress" => false}
"match_id" : "1234"
"message" : "log line of data..." -- add_field => { "match_in_progress" => false}
"match_id" : "1234"
"message" : "StartMatch" <----- this indicates my start event
"match_id" : "1234"
"message" : "log line of data..." -- add_field => { "match_in_progress" => true }
"match_id" : "1234"
"message" : "log line of data..." -- add_field => { "match_in_progress" => true }
"match_id" : "1234"
"message" : "log line of data..." -- add_field => { "match_in_progress" => true }
"match_id" : "1234"
.. maybe 2 thousand more of these -- add_field => { "match_in_progress" => true }
"match_id" : "1234"
"message" : "EndMatch" <----- this indicates my stop event
"message" : "log line of data..." -- add_field => { "match_in_progress" => false}
"match_id" : "1234"
There will be hundreds of different types of [Match][ID] occurring at once.
Is it possible to do what I'm trying to achieve within LogStash with for example, the Aggregate pluggin? Or perhaps the elasticsearch pluggin?
I have tried, and I've found it's not fitting so far, or I just don't know how to use it properly. It seems to be more tailored for adding information at the final event?
I have also tried custom ruby code, like so:
if [message] == "StartMatch" {
#start
mutate {
add_tag => [ "match_started" ]
}
ruby {
init => "@@started = ''"
code => "@@started = true"
}
}
else if "EndMatch" in [message] {
#end
mutate {
add_tag => [ "match_ended" ]
}
ruby {
code => "@@started = false"
}
}
else {
ruby {
code => "if @@started == true
event.tag('match_in_progress')"
}
}
Obviously the above does not work, because I have such a variety of different logs and [Match][ID]
coming in.
Even if I set pipeline workers to 1, I don't think this helps because there's still multiple different [Match][ID]
sources coming in at once. The data ends up all over the place.