Using aggregate to add data to previous event

Hi, I have a logfile that looks like this:

61f3483e-e7f4-4202-b34a-d85108f0fea9 - some log line
61f3483e-e7f4-4202-b34a-d85108f0fea9 - some other log line ID=123456
61f3483e-e7f4-4202-b34a-d85108f0fea9 - another log line

The first part is an execution ID, then there is the log text and one of the logs has an ID that came from the request. I am trying to get that ID and add it as a value on all the events, including the ones before it.

My filter looks like this (the execution ID is dissected before), it is able to add the value to the events after that one but not the ones before.

	grok {
		match => { "message" => "ID=(?<id>\d{6})" }
	}
	
	if "_grokparsefailure" in [tags] {
		aggregate {
			task_id => "%{execid}"
			code => "event.set('id', map['id'])"
			timeout => 5
			push_map_as_event_on_timeout => true 
		}
	} else {
		aggregate {
			task_id => "%{execid}"
			code => "map['id'] = event.get('id')"
		}
	}

If I add a sleep before the second aggregate it kinda works but logstash eventually stalls and stops working correctly due to the high number of events.
Is there a way to make aggregate wait until it gets the event with the ID it needs and then run the code for all the previous events?

Thanks.

Do it in a single filter. Collect all the lines related to the id, then split them when the timeout occurs.

    grok { match => { "message" => "^%{NOTSPACE:execid} - " } }
    grok { match => { "message" => "ID=(?<id>\d{6})" } }
    aggregate {
        task_id => "%{execid}"
        timeout => 5
        push_map_as_event_on_timeout => true
        code => '
            map["message"] ||= []
            map["message"] << event.get("message")
            id = event.get("id")
            if id
                map["id"] = id
            end
            event.cancel
        '
    }
    split { field => "message" }

That worked perfectly, thank you.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.