Syntax for Aggregate Filter Plugin (No End Event)?

Hi Logstash Maestros,

Suppose I have a continuous stream of data records flowing into Logstash, tracking how many widgets a producer as cranked out. The data fields are {timestamp} {producer} {numWidgits} and the data looks like this:

1 – ProdA – 50
2 – ProdB – 150
3 – ProdA – 50
5 – ProdA – 25
20 – ProdA – 35

Records will arrive at Logstash at random moments, and there is no signal for when records start or stop. Timestamp is in seconds.

Producers may generate widgets at any time. If a producer as made at least one widget in the last five seconds, I consider him “live” and I want to keep a tally of all the widgets he’s made while he’s been alive. However, if a producer is silent for more than 5 seconds, I consider him “inactive,” and I don’t bother tracking him any more.

So, in the above example, ProdA would be considered “live” at t=1. ProdB is “live” at t=2. ProdA then produces twice more, adding to his tally. At t=6, ProdA’s tally would be 125 (50+50+25) while ProdB’s tally would be 150. But then, one second later, ProdB hasn’t produced in over five seconds, he becomes “inactive,” so Logstash stops tracking him. And at t=11, ProdA has been silent too long, and he too is “inactive”. When ProdA produces again at t=20, his tally is a total of 35, because his previous tally expired.

Here’s the code I shamelessly adapted from the Logstash documentation site:

  aggregate {
    task_id => "%{producer}"
    code => "map['tally'] ||= 0; map['tally'] += %{numWidgets};"
    push_map_as_event_on_timeout => true
    timeout_task_id_field => "producer"
    timeout => 5         # 5 second timeout
    timeout_tags => ['_aggregatetimeout']
    # timeout_code => "event.set('several_clicks', event.get('clicks') > 1)"

Note the timeout_code is just commented out; I’m not sure what that line is doing, or if I need it.
When I apply the above, and then look at the data in my Elasticsearch, it sure doesn’t look aggregated:

sql> select producer, tally
>  from \"myindex2020.06.29\"
>  where \"@timestamp\" >= NOW()- INTERVAL 1 MINUTES;
ProdA   |null
ProdB   |null
ProdA   |null
ProdA   |null

Clearly I’m doing something wrong. Can anyone spot my error? Thanks!

Replace %{numWidgets} with event.get('numWidgets')

Note that an aggregate filter check whether timeouts have occurred every five seconds, so you cannot have a timeout shorter than that.

Thanks Badger! I've made the modification; now my code is this:

  aggregate {
    task_id => "%{producer}"
    code => "map['tally'] ||= 0; map['tally'] += event.get('numWidgets');"   #  <---- MODIFIED
    push_map_as_event_on_timeout => true
    timeout_task_id_field => "producer"
    timeout => 5         # 5 second timeout
    timeout_tags => ['_aggregatetimeout']
    # timeout_code => "event.set('several_clicks', event.get('clicks') > 1)"

But an inspection of the data suggests that the modified line isn't taking effect:

sql> select *
>  from \"myindex2020.06.30\"
>  where \"@timestamp\" >= NOW()- INTERVAL 1 MINUTES;
ProdA   |  50
ProdB   | 150
ProdA   |  50
ProdA   |  25

I was expecting to see something this:

sql> select *
>  from \"myindex2020.06.30\"
>  where \"@timestamp\" >= NOW()- INTERVAL 1 MINUTES;
ProdA   | 125
ProdB   | 150

...where the data is aggregated and I see a new field named "tally" in the data. I must still be missing something... THanks!

I suspect the problem is the very short timeout. Increase the timeout to 60 seconds and see if it starts aggregating then.

Yes! You were absolutely right; the timeout was too brief. Setting it to sixty seconds fixed the problem. Thank you!!!

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.