Aggregate filter plugin - final event contains empty message

Hello,

Logstash version: 7.17
Aggregate filter plugin: v2.10.0

Contents for /var/log/logstash/input.log:

{"timestamp": "2023-07-31T15:10:45.141Z", "parentOnly": 1, "logger_name": "activity_stream", "job": 102693, "type": "tower-logs"}
{"timestamp": "2023-07-31T15:10:48.392Z", "childOnly": 1, "logger_name": "job_events", "job": 102693, "type": "tower-logs"}
{"timestamp": "2023-07-31T15:10:50.392Z", "finalOnly": 1, "logger_name": "maindispatch", "job": 102693, "type": "tower-logs"}

Logstash configuration:

input {
    file {
      path => "/var/log/logstash/input.log"
      type => "tower-logs"
    }
}
filter {
    if [type] == "tower-logs" {
        json {
            source => "message"
            target => "linca"
        }
        if [logger_name] == "activity_stream" {
            aggregate {
                task_id => "%{job}"
                code => "
                   map['compositeOnly'] = event.get('parentOnly')
                "
                map_action => "create"
            }
        }
        if [logger_name] == "job_events" {
            aggregate {
                task_id => "%{job}"
                code => "map['compositeOnly'] += event.get('childOnly')
                "
                map_action => "update"
            }
        }
        if [logger_name] == "dispatch" {
            aggregate {
                task_id => "%{job}"
                code => "map['compositeOnly'] += event.get('finalOnly')
                "
                map_action => "update"
                end_of_task => true
                timeout => 10
            }
        }

    }
}
output {
    file {
       path => "/var/log/logstash/tavi.log"
    }
}

Output file:

{"type":"tower-logs","message":"{\"timestamp\": \"2023-07-31T14:10:45.141Z\", \"parentOnly\": 1, \"logger_name\": \"activity_stream\", \"job\": 102693, \"type\": \"tower-logs\"}","path":"/var/log/logstash/input.log","linca":{"timestamp":"2023-07-31T14:10:45.141Z","job":102693,"logger_name":"activity_stream","parentOnly":1,"type":"tower-logs"},"@version":"1","@timestamp":"2023-07-31T14:01:35.307Z","host":"IP_HOST"}
{"type":"tower-logs","message":"{\"timestamp\": \"2023-07-31T14:10:48.392Z\", \"childOnly\": 1, \"logger_name\": \"job_events\", \"job\": 102693, \"type\": \"tower-logs\"}","path":"/var/log/logstash/input.log","linca":{"timestamp":"2023-07-31T14:10:48.392Z","job":102693,"logger_name":"job_events","type":"tower-logs","childOnly":1},"@version":"1","@timestamp":"2023-07-31T14:01:35.308Z","host":"IP_HOST"}
{"type":"tower-logs","message":"{\"timestamp\": \"2023-07-31T14:10:50.392Z\", \"finalOnly\": 1, \"logger_name\": \"maindispatch\", \"job\": 102693, \"type\": \"tower-logs\"}","path":"/var/log/logstash/input.log","linca":{"timestamp":"2023-07-31T14:10:50.392Z","job":102693,"logger_name":"maindispatch","finalOnly":1,"type":"tower-logs"},"@version":"1","@timestamp":"2023-07-31T14:01:35.308Z","host":"IP_HOST"}
{"type":"tower-logs","message":"","path":"/var/log/logstash/input.log","linca":null,"@version":"1","@timestamp":"2023-07-31T14:01:35.309Z","host":"IP_HOST"}

What I would like to obtain: a 4th event that will have the composityOnly field in which the values from parentOnly, childOnly and finalOnly have been summed.

Could you please advise? Thank you!

Your first problem is that you specified a target for the json filter (which is a good idea) but you are referencing the created fields as if they are at the root. So if [logger_name] == "activity_stream" needs to be if [linca][logger_name] == "activity_stream" or else none of the aggregate filters are executed.

Likewise, none of the aggregate filters will do anything, because the [job] field does not exist, you should use task_id => "%{[linca][job]}".

The logger_name is not "dispatch", you should use if [linca][logger_name] == "maindispatch" or a regexp (=~) if you also want to match dispatch both.

If you want to create a fourth event, remove end_of_task => true and add push_map_as_event_on_timeout => true. You may also want to add timeout_task_id_field => "[linca][job]" which will get you a fourth event that looks like

{
"compositeOnly" => 3,
     "@version" => "1",
        "linca" => {
    "job" => "102693"
    },
   "@timestamp" => 2023-07-31T19:38:05.801464153Z
}

You might want to mutate [linca][job] to be an integer to match the other events, or you could let elasticsearch do that conversion.

Note that you are assuming that events will be ordered. Depending on how pipeline.ordered evaluates that may be true, but may cease to be true in the next major version.

Thank you very much for your fast response.
I am getting the following aggregation errors now:

[2023-08-03T12:20:00,271][ERROR][logstash.filters.aggregate][main][10e7af1364816d22a20] Aggregate exception occurred {:error=>#<NoMethodError: undefined method `+' for nil:NilClass>, :code=>"map['compositeOnly'] += event.get('childOnly')\n                ", :map=>{"compositeOnly"=>nil}, :event_data=>{"path"=>"/var/log/logstash/input.log", "@timestamp"=>2023-08-03T12:20:00.202Z, "@metadata"=>{"path"=>"/var/log/logstash/input.log", "host"=>"IP_HOST"}, "@version"=>"1", "host"=>"IP_HOST", "message"=>"{\"timestamp\": \"2023-08-03T19:10:48.392Z\", \"childOnly\": 1, \"logger_name\": \"job_events\", \"job\": 102693, \"type\": \"tower-logs\"}", "type"=>"tower-logs", "linca"=>{"childOnly"=>1, "logger_name"=>"job_events", "job"=>102693, "type"=>"tower-logs", "timestamp"=>"2023-08-03T19:10:48.392Z"}}}

[2023-08-03T12:20:00,289][ERROR][logstash.filters.aggregate][main][35cce70d2037d7f689a11919aef5dcb63e] Aggregate exception occurred {:error=>#<NoMethodError: undefined method `+' for nil:NilClass>, :code=>"map['compositeOnly'] += event.get('finalOnly')\n                ", :map=>{"compositeOnly"=>nil}, :event_data=>{"path"=>"/var/log/logstash/input.log", "@timestamp"=>2023-08-03T12:20:00.203Z, "@metadata"=>{"path"=>"/var/log/logstash/input.log", "host"=>"IP_HOST"}, "@version"=>"1", "host"=>"IP_HOST", "message"=>"{\"timestamp\": \"2023-08-03T19:10:50.392Z\", \"finalOnly\": 1, \"logger_name\": \"maindispatch\", \"job\": 102693, \"type\": \"tower-logs\"}", "type"=>"tower-logs", "linca"=>{"finalOnly"=>1, "logger_name"=>"maindispatch", "job"=>102693, "type"=>"tower-logs", "timestamp"=>"2023-08-03T19:10:50.392Z"}}}

My events do not come in order. What I presented here is a simplified scenario of what I really want to achieve in a more complex log aggregation. Could you please tell me what should I implement if the events for one job ID come interlaced with events that have a different job ID?

Thank you once more!

Did you set pipeline.workers to 1 for this pipeline with the aggregate filter?

I do not use aggregate much, but if I'm not wrong the events need to be ordered and running it with only 1 worker (1 cpu core) is required.

I think that this is not an issue, but the events for an specific task id must be ordered If I'm not wrong, Logstash will create a map for each task id and will agregate by them.

In this case you can rewrite the aggregate so that order does not matter. You definitely need pipeline.workers 1.

    json {
        source => "message"
        target => "linca"
    }
    aggregate {
        task_id => "%{[linca][job]}"
        code => '
            map["compositeOnly"] ||= 0

            parentOnly = event.get("[linca][parentOnly]")
            if parentOnly
                map["compositeOnly"] += parentOnly
            end

            childOnly = event.get("[linca][childOnly]")
            if childOnly
                map["compositeOnly"] += childOnly
            end

            finalOnly = event.get("[linca][finalOnly]")
            if finalOnly
                map["compositeOnly"] += finalOnly
            end
        '
        timeout => 10
        push_map_as_event_on_timeout => true
        timeout_task_id_field => "[linca][job]"
    }

Hi!

pipeline.workers: 1 was something that I had set from the beginning.

I've implemented your code @Badger and these are my results:

{"message":"{\"timestamp\": \"2023-08-10T19:10:45.141Z\", \"parentOnly\": 1, \"logger_name\": \"activity_stream\", \"job\": 102693, \"type\": \"tower-logs\"}","path":"/var/log/logstash/input.log","type":"tower-logs","@version":"1","linca":{"parentOnly":1,"logger_name":"activity_stream","job":102693,"type":"tower-logs","timestamp":"2023-08-10T19:10:45.141Z"},"@timestamp":"2023-08-10T10:52:37.781Z","host":"IP_HOST"}
{"message":"{\"timestamp\": \"2023-08-10T19:10:48.392Z\", \"childOnly\": 1, \"logger_name\": \"job_events\", \"job\": 102693, \"type\": \"tower-logs\"}","path":"/var/log/logstash/input.log","type":"tower-logs","@version":"1","linca":{"logger_name":"job_events","childOnly":1,"job":102693,"timestamp":"2023-08-10T19:10:48.392Z","type":"tower-logs"},"@timestamp":"2023-08-10T10:52:37.802Z","host":"IP_HOST"}
{"message":"{\"timestamp\": \"2023-08-10T19:10:50.392Z\", \"finalOnly\": 1, \"logger_name\": \"maindispatch\", \"job\": 102693, \"type\": \"tower-logs\"}","path":"/var/log/logstash/input.log","type":"tower-logs","@version":"1","linca":{"logger_name":"maindispatch","job":102693,"finalOnly":1,"timestamp":"2023-08-10T19:10:50.392Z","type":"tower-logs"},"@timestamp":"2023-08-10T10:52:37.803Z","host":"IP_HOST"}
{"message":"","path":"/var/log/logstash/input.log","type":"tower-logs","@version":"1","linca":null,"@timestamp":"2023-08-10T10:52:37.804Z","host":"IP_HOST"}
{"compositeOnly":3,"@version":"1","linca":{"job":"102693"},"@timestamp":"2023-08-10T10:52:51.848Z"}

The last log:

{"compositeOnly":3,"@version":"1","linca":{"job":"102693"},"@timestamp":"2023-08-10T10:52:51.848Z"}

as you can see, comes separate from the one above, which has an empty "message" field and "null" in field "linca". It also appears after a few seconds (this can also be seen in the timestamp). Is this an expected behaviour?

Thank you very much for your help!

Yes.

Thank you for your help! As I said, the example here is a simplified one from a more complex scenario. Could I share it in this thread or should I open a new thread and close this one as fixed?

If you have a new question then please open a new thread.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.