Aggregation in Ingest Pipeline of Elastic Agent - Custom Logs integration

Hey,
Currently I have a Logstash pipeline parsing, aggregating and finally delivering Postfix logs to the Elasticsearch. I want to retire Logstash completely and use Fleet to deploy a policy with Custom Logs integration. I managed to parse the logs but I'm not at the last step at which I need to aggregate them based on a field.
(Postfix logs are structured in a way such as one logical process of a mail is split into numerous "physical" processes and thus loglines, only ever to be connected by the "queueid", which is a successfully parsed field by this time)

Basically I need to equivalent in Elastic Ingest Pipeline Processor of the following Logstash snippet:

filter {
  if ![queueid] {
    drop {}
  } else {
    aggregate {
      task_id => "%{queueid}"
      aggregate_maps_path => "/inputs/.aggregate_maps"
      code => "
        map.merge!(event)
      "
      map_action => "create_or_update"
      push_previous_map_as_event => false
      push_map_as_event_on_timeout => true
      timeout => 30
      timeout_tags => ['aggregated']
    }
  }
}

How would one be able to reproduce such functionality in terms of Ingest Pipeline's processors?
Thanks

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.