Dropping Events in between Start Event & End Event

(karnamonkster) #1


My setup : Logstash/Elasticsearch/Kibana 6.4.3
These are the sample logs that are recorded from a source.
Where alarmstate: 0 means Alarm open & alarmstate:1 means Alarm closed
alarm: id1
alarmstate: 0
deviceID: 123
Time: April 10th 2019, 11:30:00.000

alarm: id1
alarmstate: 0 
deviceID: 123
Time: April 10th 2019, 11:40:00.000

alarm: id1
alarmstate: 0 
deviceID: 123
Time: April 10th 2019, 11:50:00.000 

alarm: id1
alarmstate: 1 
deviceID: 123
Time: April 10th 2019, 12:00:00.000 

For this scenario, i wish to drop the events coming in between with same state. And only retain First occurence of alarmstart and only alarmclosed event.
i.e. drop all alarmstate:0 after the first alarm is open and vice versa


Assuming you use a multiline filter to group those lines into events then you can use an aggregate filter

    grok { match => { "message" => "^Time: %{GREEDYDATA:[@metadata][ts]}" } }

    # Strip out th from 10th etc.
    mutate { gsub => [ "[@metadata][ts]", "([A-Z][a-z]+ [0-9]{1,2})[a-z]{2}(.*)", "\1\2" ] }
    date { match => [ "[@metadata][ts]", "MMMM d YYYY, HH:mm:ss.SSS", "MMMM d YYYY, HH:mm:ss.SSS " ] }

    mutate { gsub => [ "message", "Time: .*", "" ] }
    kv { value_split => ": " field_split => "
 " }
    aggregate {
        task_id => "%{deviceID}+%{alarm}"
        code => '
            map["alarmstate"] ||= 2
            state = event.get("alarmstate")
            if state == map["alarmstate"]
                map["alarmstate"] = state

(Guy Boertje) #3

It will help you to find an answer when you realise that Logstash is inherently not ordered and batch oriented in its processing. You can't rely on the order of events being strictly serially handled unless you set pipeline.workers to 1 and accept the hit on throughput that comes with one filter/output worker.

The aggregate filter can do this, however it would be easier to think of building a single Elasticsearch document per "event". This, though, affects the type of queries and visualisations you can do in Kibana. Instead of having discrete event docs in ES and then having to do aggregations in ES/Kibana you can create the aggregated (composite) document in the aggregate filter made from the individual elements that describe the real world event/occurrence.

Let me know if this is what you want and I can help further.

There are other solutions using the memcached filter to memoize that a previous element was seen and if so drop the element (thereby storing unaggregated elements in ES) but this also requires one worker.

(karnamonkster) #4

Hi @Badger @guyboertje ,

Thanks so much for the help, I did try to use the aggregate filter and ran it with single worker, Good this is that the logic works fine with some missing data.
For example for some devices:

{"alarm": "id1", "alarmstate": 0, "deviceID": "123", "Time": 2019-04-11T12:50:00} ### THIS IS NOT PARSED WHICH IS A START EVENT
{"alarm": "id1", "alarmstate": 0, "deviceID": "123", "Time": 2019-04-11T13:00:00} ### THIS IS PARSED AND NOT THE ONE ABOVE as START
{"alarm": "id1", "alarmstate": 1, "deviceID": "123", "Time": 2019-04-11T13:10:00} ### CORRECTLY PARSED as END Event

Could this be the reason is that i am trying to test this by pulling data from another ES where i have all the data. What could i do to make sure i am not missing any data?

And thanks once again guys.. :slight_smile:

(karnamonkster) #5

Hi @guyboertje @Badger ,

Can you please confirm if i have 2 filters applied within this logstash configuration and while running with single pipeline.workermay result in skipping the sequence as now i can see there are records which are not dropped in between.
Do I need to set a timeout? since i dont have it explicitly set within the configuration

I think we are almost there, but still need to figure out the solution for handling huge number of deviceIDs ( ~ 500000)