Logstash field is never shown after aggregation

I have logstash version 7.8.0 Can someone tell me why the aggregation below never shown THREAD_ID field into documents please ? My field : thread_id is added in the end of aggregation ..

2024-12-14 12:00:01 thread-1 SOAP message <<Envelope 011>>
2024-12-14 12:00:02 thread-1 SOAP message >>Envelope 012<<
2024-12-14 12:05:03 thread-2 SOAP message <<Envelope 021>>
2024-12-14 12:05:04 thread-2 SOAP message >>Envelope 022<<
filter {
  grok {
    match => {
      "message" => [
        '%{TIMESTAMP_ISO8601:log_timestamp} thread-%{INT:thread_id} SOAP message <<(?<soap_in>.*?)>>',
        '%{TIMESTAMP_ISO8601:log_timestamp} thread-%{INT:thread_id} SOAP message >>(?<soap_out>.*?)<<'
      ]
    }
  }
  aggregate {
  task_id => "%{thread_id}"
  code => "
    map['soap_in'] ||= []
    map['soap_out'] ||= []

    # Capture soap_in with timestamp if exists
    if event.get('soap_in')
      map['soap_in'] << {'soap_in' => event.get('soap_in'), 'log_timestamp' => event.get('log_timestamp')}
    end

    # Capture soap_out with timestamp if exists
    if event.get('soap_out')
      map['soap_out'] << {'soap_out' => event.get('soap_out'), 'log_timestamp' => event.get('log_timestamp')}
    end

   # Once both soap_in and soap_out are available, emit the aggregated event
    if map['soap_in'] && map['soap_out']
      event.set('soap_in', map['soap_in'])
      event.set('soap_out', map['soap_out'])
      event.set('thread_id', event.get('thread_id'))
      event.cancel()
   end
   "
  push_previous_map_as_event => true
  timeout => 3
}
  mutate {
    remove_field => ["message"]
  }
}

Result never shown thread_id

{
  "took" : 0,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 2,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "app-aggregate-2024.12.13",
        "_type" : "_doc",
        "_id" : "qop5wJMB81mNBoMqWzzC",
        "_score" : 1.0,
        "_source" : {
          "@version" : "1",
          "soap_out" : [
            {
              "log_timestamp" : "2024-12-14 12:00:02",
              "soap_out" : "Envelope 012"
            }
          ],
          "@timestamp" : "2024-12-13T14:43:18.985Z",
          "soap_in" : [
            {
              "log_timestamp" : "2024-12-14 12:00:01",
              "soap_in" : "Envelope 011"
            }
          ]
        }
      },
      {
        "_index" : "app-aggregate-2024.12.13",
        "_type" : "_doc",
        "_id" : "rop5wJMB81mNBoMqbTwN",
        "_score" : 1.0,
        "_source" : {
          "@version" : "1",
          "soap_out" : [
            {
              "log_timestamp" : "2024-12-14 12:05:04",
              "soap_out" : "Envelope 022"
            }
          ],
          "@timestamp" : "2024-12-13T14:43:23.504Z",
          "soap_in" : [
            {
              "log_timestamp" : "2024-12-14 12:05:03",
              "soap_in" : "Envelope 021"
            }
          ]
        }
      }
    ]
  }
}

So in result elasticsearch, we can see that we have 2 values instead of 4, that's great but i still don't know why this never shown the field thread_id even if it is mentioned in aggregate Thank you in advance,

The three event.set calls have no effect and can be deleted. They add fields to an event that is immediately discarded. Then, when an event with a different task_id (thread_id) arrives, a new event is created from the contents of the map, which does not include thread_id.

Try adding map['thread_id'] = event.get('thread_id') at the top of the code block.

You are not relying on a timeout, so I don't think the timeout_task_id_field option can be used to shortcut this.

1 Like

Thank you M.BADGET, it works with timeout => 3, when i delete this line, i should wait to see other threads coming!

Just one more question, this sample is very short sample with only 4 rows, when it comes to thousands and thousands of log rows ? do this kind of filter could work ?
thank you in advance

filter {
  grok {
    match => {
      "message" => [
        '%{TIMESTAMP_ISO8601:log_timestamp} thread-%{INT:thread_id} SOAP message <<(?<soap_in>.*?)>>',
        '%{TIMESTAMP_ISO8601:log_timestamp} thread-%{INT:thread_id} SOAP message >>(?<soap_out>.*?)<<'
      ]
    }
  }
  aggregate {
  task_id => "%{thread_id}"
  code => "
    map['soap_in'] ||= []
    map['soap_out'] ||= []
    map['thread_id'] ||= []
    map['thread_id'] = event.get('thread_id')

    if event.get('soap_in')
      map['soap_in'] << {'soap_in' => event.get('soap_in'), 'log_timestamp' => event.get('log_timestamp')}
    end

    if event.get('soap_out')
      map['soap_out'] << {'soap_out' => event.get('soap_out'), 'log_timestamp' => event.get('log_timestamp')}
    end

    if map['soap_in'] && map['soap_out']
      event.set('thread_id', map['thread_id'])
      event.set('soap_in', map['soap_in'])
      event.set('soap_out', map['soap_out'])
      event.cancel()
   end
   "
  push_previous_map_as_event => true
  timeout => 3
}
  mutate {
    remove_field => ["message"]
  }
}