How to aggregate log transactions based on field value in logstash

My log file looks like this:


start time: 2021-10-11T13:54:34Z
category: commercial
status: started
end time:

start time: 2021-10-11T13:54:34Z
category: commercial
status: Running
end time:

start time: 2021-10-11T13:54:34Z
category: commercial
status: Completed
end time: 2021-10-11T13:59:12Z

start time: 2021-10-11T20:14:23Z
category: Small Business
status: started
end time:

start time: 2021-10-11T20:14:23Z
category: Small Business
status: Completed
end time: 2021-10-11T20:35:21Z

I want to aggregate log transactions based on Category value.

In other words, first three transactions with status (started, running, completed) should be aggregated as 1 transaction for category "Commercial". last two transactions with status (started, completed) should be considered as 1 transaction for category "Major". How can I aggregate this?

Right now, I am using grok pattern (shown below) is reading each transaction separately which means category "Commercial" has 3 transactions and "Major" has 2 transactions at target side.

grok {
match => {"message" => "%{DATA:start_time_name}:%{DATA:start_time}#%{DATA:category_name}:%{DATA:category}#%{DATA:status_name}:%{DATA:status}#%{DATA:end_time_name}:%{DATA:end_time}"
}

I am new to logstash and I am struggling to aggregate log based on particular field value as described above. Kindly help me how can I achieve this.

I would use a file input with a multiline codec to consume that log

codec => multiline { pattern => "^start time" negate => true what => previous auto_flush_interval => 1 multiline_tag => "" }

Then a kv filter to split out the fields.

    kv { field_split_pattern => "\n" value_split => ":" }
    aggregate {
        task_id => "%{category}"
        code => '
            map["startTime"] = event.get("start time")
            map["endTime"] = event.get("end time")
            map["status"] = event.get("status")
            event.cancel
        '
        push_map_as_event_on_timeout => true
        timeout_task_id_field => "category"
        timeout => 6
    }

This will produce two events

{
"@timestamp" => 2021-10-20T01:36:48.874Z,
  "@version" => "1",
 "startTime" => "2021-10-11T13:54:34Z",
   "endTime" => "2021-10-11T13:59:12Z",
  "category" => "commercial",
    "status" => "Completed"
}
{
"@timestamp" => 2021-10-20T01:36:48.875Z,
  "@version" => "1",
 "startTime" => "2021-10-11T20:14:23Z",
   "endTime" => "2021-10-11T20:35:21Z",
  "category" => "Small Business",
    "status" => "Completed"
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.