Logstash Filter: How To Add Tag For “If X Event Happens More Than Y Times In Z Seconds”

I am new to Logstash and am trying to write a filter that will add a tag when event X is seen more than Y times in Z seconds.

For instance, in the following example log, I want to apply a tag to traffic from the IP address 1.2.3.4 because the same event (EVENT_ID 1132) occurred more than 5 times in less than 30 minutes:

Firewall Log EVENT_ID 1132 TIME 11:34:56 SRC_IP 1.2.3.4 SRC_PORT 45729 DST_IP 9.8.7.6 DST_PORT 80 DROP
Firewall Log EVENT_ID 1132 TIME 11:34:57 SRC_IP 1.2.3.4 SRC_PORT 45730 DST_IP 9.8.7.6 DST_PORT 443 DROP
Firewall Log EVENT_ID 1132 TIME 11:34:58 SRC_IP 1.2.3.4 SRC_PORT 45731 DST_IP 9.8.7.6 DST_PORT 22 DROP
Firewall Log EVENT_ID 1132 TIME 11:36:06 SRC_IP 2.3.4.5 SRC_PORT 41446 DST_IP 6.5.4.3 DST_PORT 80 DROP
Firewall Log EVENT_ID 1132 TIME 11:37:12 SRC_IP 1.2.3.4 SRC_PORT 45732 DST_IP 8.7.6.5 DST_PORT 80 DROP
Firewall Log EVENT_ID 1132 TIME 11:37:13 SRC_IP 1.2.3.4 SRC_PORT 45734 DST_IP 8.7.6.5 DST_PORT 443 DROP
Firewall Log EVENT_ID 1132 TIME 11:37:14 SRC_IP 1.2.3.4 SRC_PORT 45736 DST_IP 8.7.6.5 DST_PORT 22 DROP

I have looked at the filter page (https://www.elastic.co/guide/en/logstash/current/filter-plugins.html), specifically at the aggregate, elapsed and meter functions, but I would love some input on how to best accomplish this and not DoS my Logstash server.

My ultimate goal is to send specific events that meet the criteria (event X happens more than Y times in Z seconds) to another ES instance. If there is a better way than using tags I am all ears.

Thank you!

NOTE: I also posted this over at SO before finding this forum: http://stackoverflow.com/q/38776331/3133298

Did you take a look at https://www.elastic.co/guide/en/logstash/current/plugins-filters-collate.html as well?

Hi Mark,

I did look at it and at first I didn't believe this would work. However, after re-reading about the collate filter I am going to test it, my assumptions might have been wrong.

Thanks!