[Logstash] Aggregate filter - no timeout

Hi,
In my logstash 7.9.3 I would like to create filter similar to that example https://www.elastic.co/guide/en/logstash/current/plugins-filters-aggregate.html#plugins-filters-aggregate-example3
I want to create index with interfaces and their state, there are situations when state is changing really fast and simple action => "update" in output is not enough.
I created aggregate filter in my pipeline but unfortunately it doesn't work as in docs.

  aggregate {
    task_id => "%{_id}"
#    code => "map['temp_state'] = event.get('state'); map['temp_state_numerically'] = event.get('state_numerically');"
    code => "event.set('test', 'test1');"
    push_map_as_event_on_timeout => true
    timeout_task_id_field => "_id"
    timeout => 300 
    inactivity_timeout => 300
    timeout_tags => ['_aggregatetimeout']
    timeout_timestamp_field => "@timestamp"
    timeout_code => "event.set('test2', 'test_2');"
#    timeout_code => "event.set('state', event.get('temp_state')); event.set('state_numerically', event.get('temp_state_numerically'));"
  }

To check how it works i added "test" field, and the result is that test field is created but test2 field and _aggregatetimeout tag is never created. I dont know why code isn't executed. It seems that i didnt achieved timeout but how is it possible?

can you help me?
Patryk

Is that field set on the event or not? You say it is and you also say it is not. If it is not then the most likely answer is that the [_id] field does not exist.

sorry i made a mistake in a text, I have already corrected it

The timeout does not always occur. Is the pipeline still running or has it terminated because it has completed all of its work. What inputs are you using?

Pipeline is still running, there is no end event as in 3rd example. At the beggining I have a pipeline for all network logs, when there is action==UPDOWN, I add "state" tagthe and in output for "state" logs i have pipeline output. For state logs i have second pipeline with pipeline input:

input {
  pipeline {
    address => "network-devices-state"
    tags => ["state-lshjn"]
  }

}

I know that on host with logstash, there is a little unsynchronised date, about +1-2minutes. Could it be a problem?

ehh so stupid error, i spent hours to find the problem... "timeout_tags" doesn't add another tag but overwrite everything, i have if statement in output which check tags before writing to index, so when aggregate filter overwrite tags values pipeline couldn't write that data to my index :frowning:

That is not quite true. When a timeout occurs a new event is created using the map entry. Unless you added a field called [tags] to the map it will not exist. If you changed the filter to use

code => 'map["tags"] = [ "foo", "bar" ];'

then I would be very, very surprised if that got overwritten rather than having a third tag added. (I always use single quotes around the code block in case I want to use Ruby string magic, which only works in strings surrounded with double quotes.)

Even now I sometimes forget this and it is not until I see the timeout events I realize they do not have some of the fields common to all the original events.