Stateful contextual tagging

Hi,

I have a logfile a bit like this:

...
Starting task  123456
Reticulating splines
Sequencing Particles
Computing Optimal Bin Packing
Integrating Curves
Finished processing task 123456
Waiting for another task
Starting task 314159
...

I have start and end markers (with a task ID like 123456) and multiple events in between. The lines in between the markers all relate to that task ID. Is there an existing filter or codec that will allow me to generate an event per line but add a tag representing the current task ID 'context'? i.e. I'd want to get out events like:

{
  "message": "Reticulating splines",
  "task_id": "123456",
}

and

{
  "message": "Sequencing Particles",
  "task_id": "123456",
}

The multline and aggregate plugins, as far as I can tell, allow me to collapse the 6 lines into a single event (with the relevant task ID), but I'm looking for something that preserves the original events and annotates them with additional tags.

If not I imagine I can write my own filter or codec that will do this. I'm not yet sure which would be most appropriate, as I've never done this and I'm only gettings started with ELK.

I'd also be interested in a slightly more complex example where task logs may be multiplexed into the same log, but I can associate the logged event with the "Task ID" by matching on some other key in the logged line e.g. the thread ID may be logged:

[tid=111] Starting task  123456
[tid=222] Starting task  314159
[tid=222] Reticulating splines
[tid=111] Reticulating splines
...

(Lines 1 and 4 should be tagged with task 123456, lines 2 and 3 with 314159).

Thanks

I wonder if the aggregate filter would help here.

I don't think so - I think aggregate emits a single event for all aggregated lines.
I'm looking to spit out as many lines as I have going in, but add context info on the way.

I'm wondering if it can be done with the ruby plugin. I don't know if it will allow me to save some state (some variable) across events.

I don't think so - I think aggregate emits a single event for all aggregated lines.
I'm looking to spit out as many lines as I have going in, but add context info on the way.

I was hoping that one could find a corner case where it emits all events anyway.

I'm wondering if it can be done with the ruby plugin. I don't know if it will allow me to save some state (some variable) across events.

I don't think it's possible to persist state from the ruby plugin.

I really think aggregate plugin could help you.
I know very well this plugin since I am its creator.

with aggregate plugin, you can get one info from one log line, and make it available through all over log lines.

to answer your need, give a try to this configuration :

if [message] =~ "^Starting task" {
grok {
match => [ "message", "Starting task %{INT:task_id}"]
}
aggregate {
task_id=> "no_correlation_id"
code => "map['task_id'] = event['task_id']"
map_action=> "create"
}
}
else if [message] =~ "^Finished processing task " {
aggregate {
task_id=> "no_correlation_id"
code => "event['task_id'] = map['task_id']"
map_action=> "update"
end_of_task => true
}
}
else {
aggregate {
task_id=> "no_correlation_id"
code => "event['task_id'] = map['task_id']"
map_action=> "update"
}
}

One important precision. You must be sure to have only one worker filter.
So when starting logstash, set this command line flag : "-w 1"

1 Like

Would it f.ex. be possible to store state in ES and search for it there, f.ex. from elasticsearch|ruby filter and then have more workers?

The reason why you must have only one filter worker, is to be sure that events arrive in the right order.
With several filter workers (so several threads), nothing prevents you process firstable event 2 and then event 1.

If you store in elasticsearch, you must be sure to store events in the right order, and with an incremental id (or something like that) so that you can retrieve this order later.
I don't think it's possible with multiple filter workers...

Yes correct, a race condition might easy arise, specially if correlated events are close in time ::+1:: hence the restriction for only one worker