Aggregate filter bug with java execution engine

Hi all,
I was getting crazy trying to debug the same aggregate filter that on other logstash instances worked smoothly.
In my new elk cluster, v 7.1.1, this filter faced some kind of syncronization error because sometimes it aggregate the start event with the end event of the previous transaction, sometimes it simply push events without aggregations. I tried everything to fix this behaviour, until I recalled that on logstash 7 the old ruby engine has been replaced with the new java engine.
So I disabled the java engine on my logstash instances and all is woking fine again now!

Let me know if you need more informations

1 Like

A reproducible example would help. There was a period of months where I was convinced that with 'pipeline.java_execution: true', even with '--pipeline.workers 1', a logstash pipeline would re-order events. Now I understand that event order is not guaranteed, but I cannot now construct an example where that is demonstrably false when '-w 1' is enabled.

When I thought I was able to demonstrate this I was using 5 to 10 events, certainly far less than a pipeline batch.

Having the following log lines:

11:51:57,440 INFO  [org.apache.cxf] (http-server1:18080-2) Inbound Message
 ----------------------------
 ID: 341677
  
 11:51:57,554 INFO  [org.apache.cxf] (http-server1:18080-2) Outbound Message
 ---------------------------
 ID: 341677
  
  
 11:51:57,560 INFO  [org.apache.cxf] (http-server1:18080-2) Inbound Message
 ----------------------------
 ID: 341678
  
 11:51:57,635 INFO  [org.apache.cxf] (http-server1:18080-2) Outbound Message
 ---------------------------
 ID: 341678

and defined the following filters:

    grok {
        match => [ "message", "%{TIME:time} %{WORD:log.level}[ \t]+%{GREEDYDATA:t_shortmessage}"]
    }        

    grok {
        match => [ "t_shortmessage", "\[%{JAVAFILE}\][ \t]+\(%{DATA:thread}\)[ \t]%{GREEDYDATA}"]
    }

    if "Inbound Message" in [t_shortmessage] {
        aggregate {
            task_id => "%{[application][name]}-%{[host][name]}-%{thread}"
            code => "map['start_message'] = event.get('t_shortmessage'); map['start_time'] = event.get('@timestamp')"
            map_action => "create"
            timeout => 30
            timeout_tags => ['Aggregated']
            timeout_code => "event.set('transaction.time', 30 ); event.set('transaction.result', -1 )"
        }
    }
    if "Outbound Message" in [t_shortmessage] {
        aggregate {
            task_id => "%{[application][name]}-%{[host][name]}-%{thread}"
            code => "event.set('start_message', map['start_message']); event.set('transaction.time', (event.get('@timestamp')-map['start_time'])*1000)"
            map_action => "update"
            end_of_task => true
            add_tag => [ "Aggregated" ]
        }
    }

With java execution engine the filter was aggregating the inbound log with ID 341678 with the outbound with ID 341677, calculating a negative transaction.time. In addition, Outbound with ID 341678 is left alone, without aggregation.

This, of course, don't happen all the time, but it happen with higher rate if the end of a transaction is very close the start of a new transaction.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.

Hello,
As a prerequisite, as @Badger stated, this can only work enforcing 1 single worker.

When the Java execution is being used, we've detected the events get re-arranged internally given the engine execution model.

In the documentation we've always somehow "ensured" this behavior when forcing 1 worker.

The issue https://github.com/elastic/logstash/issues/10938 has been opened to guarantee the correct behavior of the aggregate filter when the number of worker is set to 1.
We do not have an ETA for this fix.

I've tried to reproduce your pipeline.

I had to apply few modifications: I changed the thread as in the log I have no [application][name] or [host][name].

input {
  generator {
    lines => [
      "11:51:57,440 INFO  [org.apache.cxf] (http-server1:18080-2) Inbound Message",
      "11:51:57,554 INFO  [org.apache.cxf] (http-server1:18080-2) Outbound Message",
      "11:51:57,560 INFO  [org.apache.cxf] (http-server2:18080-2) Inbound Message",
      "11:51:57,635 INFO  [org.apache.cxf] (http-server2:18080-2) Outbound Message"
    ]
    count => 1
  }
}

filter {
    grok {
        match => [ "message", "%{TIME:time} %{WORD:log.level}[ \t]+%{GREEDYDATA:t_shortmessage}"]
    }        

    grok {
        match => [ "t_shortmessage", "\[%{JAVAFILE}\][ \t]+\(%{DATA:thread}\)[ \t]%{GREEDYDATA}"]
    }

    if "Inbound Message" in [t_shortmessage] {
        aggregate {
            task_id => "app-%{thread}"
            code => "map['start_message'] = event.get('t_shortmessage'); map['start_time'] = event.get('@timestamp')"
            map_action => "create"
            timeout => 30
            timeout_tags => ['Aggregated']
            timeout_code => "event.set('transaction.time', 30 ); event.set('transaction.result', -1 )"
        }
    }
    if "Outbound Message" in [t_shortmessage] {
        aggregate {
            task_id => "app-%{thread}"
            code => "event.set('start_message', map['start_message']); event.set('transaction.time', (event.get('@timestamp')-map['start_time'])*1000)"
            map_action => "update"
            end_of_task => true
            add_tag => [ "Aggregated" ]
        }
    }
}

output {
    if "Aggregated" in [tags] {
        stdout { codec => rubydebug }
    }
}

You can run it with:

  • ./bin/logstash -f test-aggregate.conf -w 1 --java-execution=true
  • ./bin/logstash -f test-aggregate.conf -w 1 --java-execution=false

With this pipeline, there's a major issue: @timestamp is not a representation of the time field you extract from the log line.
The transaction.time will just represent the amount of time taken by Logstash to process the lines.
It is possible to parse the time into an epoch, but as we're missing the full date, we would have issues with events which span over 2 days.

The output is in any case:

{
          "@timestamp" => 2019-07-24T17:51:09.777Z,
            "@version" => "1",
              "thread" => "http-server1:18080-2",
             "message" => "11:51:57,554 INFO  [org.apache.cxf] (http-server1:18080-2) Outbound Message",
            "sequence" => 0,
    "transaction.time" => 21.0,
       "start_message" => "[org.apache.cxf] (http-server1:18080-2) Inbound Message",
           "log.level" => "INFO",
      "t_shortmessage" => "[org.apache.cxf] (http-server1:18080-2) Outbound Message",
                "tags" => [
        [0] "Aggregated"
    ],
                "time" => "11:51:57,554"
}
{
          "@timestamp" => 2019-07-24T17:51:09.778Z,
            "@version" => "1",
              "thread" => "http-server2:18080-2",
             "message" => "11:51:57,635 INFO  [org.apache.cxf] (http-server2:18080-2) Outbound Message",
            "sequence" => 0,
    "transaction.time" => 0.0,
       "start_message" => "[org.apache.cxf] (http-server2:18080-2) Inbound Message",
           "log.level" => "INFO",
      "t_shortmessage" => "[org.apache.cxf] (http-server2:18080-2) Outbound Message",
                "tags" => [
        [0] "Aggregated"
    ],
                "time" => "11:51:57,635"
}