Hey,
Currently I have a Logstash pipeline parsing, aggregating and finally delivering Postfix logs to the Elasticsearch. I want to retire Logstash completely and use Fleet to deploy a policy with Custom Logs integration. I managed to parse the logs but I'm not at the last step at which I need to aggregate them based on a field.
(Postfix logs are structured in a way such as one logical process of a mail is split into numerous "physical" processes and thus loglines, only ever to be connected by the "queueid", which is a successfully parsed field by this time)
Basically I need to equivalent in Elastic Ingest Pipeline Processor of the following Logstash snippet:
filter {
if ![queueid] {
drop {}
} else {
aggregate {
task_id => "%{queueid}"
aggregate_maps_path => "/inputs/.aggregate_maps"
code => "
map.merge!(event)
"
map_action => "create_or_update"
push_previous_map_as_event => false
push_map_as_event_on_timeout => true
timeout => 30
timeout_tags => ['aggregated']
}
}
}
How would one be able to reproduce such functionality in terms of Ingest Pipeline's processors?
Thanks