Logstash Elasticsearch Filter Lookup 99% not finding if End to close to Start?

Hi,

I am trying to do the standard End items enrichment with the Start item details.

filter{
   if [benchmarkType] == "End" {
         elasticsearch {
            hosts => ["MY-IP:9200"]
            index => "prod-*"
            query => 'benchmarkType:Start AND messageId:"%{[messageId]}"'
            fields => { "precisionDate" => "initialPrecisionDate" "Elapsed" => "initialElapsed" }
            tag_on_failure => ["_elasticsearch_lookup_failure"]
         }
         date {
            match => ["[initialPrecisionDate]", "ISO8601"]
            target => "[initialPrecisionDate]"
         }

        date {
            match => ["[precisionDate]", "ISO8601"]
            target => "[precisionDate]"
         }

         ruby {
            code => 'event.set("TotalDuration", (event.get("precisionDate") - event.get("initialPrecisionDate") - event.get("initialElapsed")/1000 ) ) rescue nil'
        }
      }
   }
}

99% of the items are not enriched, nor tagged with _elasticsearch_lookup_failure and have the fields empty.

Is it possible due to the fact the @timestamp difference between Start and End items is < 20ms that the lookup made by End item will not find the Start item and thus not working ?

If you are enriching events and sending them to Elasticsearch, and then trying to look up the enrichments using an Elasticsearch filter, then you need to understand that events are processed in batches. A batch of 125 events (by default) is processed through each filter before being sent to the next filter and then the output.

If two events in one batch are related then the lookup of the first will fail, because it has not been sent to Elasticsearch.

You may be able to use an aggregate filter. See Example 1. Make sure you read the restrictions around pipeline.workers and pipeline.ordered.

Thank you for the suggestion.

I have switch to Aggregate and managed to reach ~70% of items to be enriched with the total duration.
In order to have more I tried combining the two but it doesnt seem it makes Lookup on the ones he cannot aggregate (I assume due to not being in the same 125 event batch):

 if [benchmarkType] == "Start" {
      aggregate {
        task_id => "%{messageId}"
        map_action => "create"
        code => " map['initialPrecisionDate'] ||= event.get('precisionDate')
                  map['initialElapsed'] ||= event.get('Elapsed')/1000"
     }
    }

   if [benchmarkType] == "End"{
       aggregate {
        task_id => "%{messageId}"
        map_action => "update"
        end_of_task => true
        timeout => 600
        timeout_tags => ['_aggregate_Precision']
        code =>
        "event.set('TotalDuration',Time.parse(event.get('precisionDate')).to_f - Time.parse(map['initialPrecisionDate']).to_f + map['initialElapsed'] ) rescue nil"
     }
     if ![TotalDuration] or [TotalDuration]==""{
         elasticsearch {
            hosts => ["MY-IP:9200"]
            index => "prod-*"
            query => 'benchmarkType:Start AND messageId:"%{[messageId]}"'
            fields => { "precisionDate" => "initialPrecisionDate" "Elapsed" => "initialElapsed" }
            tag_on_failure => ["_elasticsearch_lookup_failure"]
         }
         date {
            match => ["[initialPrecisionDate]", "ISO8601"]
            target => "[initialPrecisionDate]"
         }

        date {
            match => ["[precisionDate]", "ISO8601"]
            target => "[precisionDate]"
         }
         ruby {
            code => 'event.set("TotalDuration", (event.get("precisionDate") - event.get("initialPrecisionDate") + event.get("initialElapsed")/1000 ) ) rescue nil'
        }
      }
    }

Any Idea why am I missing the 30% or if why in the above code conditioning on field TotalDuration doesnt work and he doesnt do the Lookup ?

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.