Elapsed filter plugin issues

Hi all,

I'll try to keep it short and technical enough to not be boring (I too hate to read long drifting problems)

I've got some issues with the elapsed plugin. Here is my setup in a nutshell.
Just know that this setup is working fine on all the pipelines except the new one using the "elapsed" plugin.
I use the official docker from elasticsearch version 7.6.1 (docker.elastic.co/logstash/logstash:7.6.1)

This is the new pipeline configuration (you can notice workers set to 1 as it seems to be an issue with "elapsed".

  • pipeline.id: BLAH
    path.config: "/usr/share/logstash/pipeline/BLAH"
    pipeline.workers: 1
    pipeline.batch.size: 200
    pipeline.batch.delay: 50

My input is s3, where the files are coming from

input {
s3 {
codec => multiline {
pattern => "^ %{TIMESTAMP_ISO8601} %{ISO8601_TIMEZONE} "
what => "previous"
negate => true
}
bucket => "blah-logs"
type => "s3-access"
region => "ap-southeast-2"
prefix => "BLAH.LOGS"
backup_add_prefix => "logstash-processed/"
backup_to_bucket => "blah-logs"
interval => 60
delete => true
add_field => {
"doctype" => "blah"
"es_index" => "blah-logs"
"aws_environment" => "BLAH"
"aws_region" => "someregion"
}
}
}

My filter with the elapsed issues (everything else is fine, flag are setup properly)

filter {
if [doctype] == "blah-logs" {
grok {
match => [ "message", '^ %{TIMESTAMP_ISO8601:log_time} %{ISO8601_TIMEZONE:log_timezone} [%{WORD:event_type}] %{GREEDYDATA:event_message}' ]
tag_on_failure => ["_grokparsefailuremain"]
# remove_field => ["message"]
}
mutate {
add_field => {
"log_timestamp" => "%{log_time}%{log_timezone}"
}
remove_field => ["log_time", "log_timezone"]
}
date {
match => [ "log_timestamp", "ISO8601" ]
remove_field => ["log_timestamp"]
}

    if [event_message] =~ "BLAHAPISender.Send - Start Place Order for OrderNo" {
        grok {
            match => { "event_message" => "BLAHAPISender.Send - Start Place Order for OrderNo %{NOTSPACE:order_id}" }
            add_tag => [ "OrderStarted"]
            tag_on_failure => ["_grokparsefail-start"]
        }
    }
    if [event_message] =~ "Place Order Results for OrderNo" {
        grok {
            match => { "event_message" => "Place Order Results for OrderNo %{NOTSPACE:order_id}: True" }
            add_tag => [ "OrderProcessed"]
            tag_on_failure => ["_grokparsefail-stop"]
        }
    }

    elapsed {
        start_tag => "OrderStarted"
        end_tag => "OrderProcessed"
        unique_id_field => "order_id"
        timeout => 120
        new_event_on_match => false
        keep_start_event => "first"
    }
}

}

My output then is to a 3 nodes cluster in AWS.

The behavior I got:

  • On first run, a log is processed and most (but not all) of the elapsed event end up in "elapsed_end_without_start" state
  • If I run a a second time (with a clean index or adding the events again) they all come up good

My end goal it go have 3 different elapsed on this filter. It always failing with "elapsed_end_without_start" when I try with the 3 elapsed. So my goal is to have a consistent behavior with 1 elapsed first.

I hope someone will be able to help me with this. My 2 cents is that it seems the logstash daemon is "learning" from the first pass and then get the second pass correct. Are the order_id still in memory ? Is it reading to fast the first time ?

Small update:

I've changed the "pipeline.batch.size" of the pipeline to 1
The behavior is now better with probably around 80% success with 3 elapsed. So the logstash processing order/time/way have definitely to do with this.

Update 2:
I kept the super inefficient

pipeline.batch.size: 1

I only use 2 elapsed now and manage to have 100% success.

So the first lesson is I can't attach 2 elapsed to the same end_tag, even if one is
new_event_on_match => false
and the other
new_event_on_match => true

well ok, that's one step forward. Sad I have to learn it the empirical way. I'll now test the workers, since 1 is quite dumb ....

Update 3:

I now got everything to work consistantly. My only issue is around performances ....

pipeline.workers: 1
pipeline.batch.size: 1

It is a quite limiting configuration ....
Can someone from elasticsearch have a look at this ?