Logstash Aggregate plugin not working

Trying to implement aggregate plugin based on the documentation, but I'm having problems. Here's how I've configured it:

aggregate {
    task_id => "%{tunnelid}"
    code => "
        map['user'] = event.get('user')
        map['group'] = event.get('group')
        map['remip'] = event.get('remip')
        map['logs'] = []
        map['logs'] << {
            'action' => event.get('action'),
            'datetime' => event.get('datetime')
        }
        event.cancel()
    "
    map_action => "create"
    timeout => 30
    timeout_code => "event.set('state', 'timeout')"
    push_map_as_event_on_timeout => true
}

I've set pipeline.workers to 1 in the pipelines.yml configuration file, as shown below:

- pipeline.id: main
  path.config: "/etc/logstash/conf.d/*.conf"
  pipeline.workers: 1

The output in Elasticsearch however always ends up unformatted like this:

[{'_index': 'sslvpnteste-2020.09.29', '_type': '_doc', '_id': 'fysk2nQBwfAlkZKXBBW9', '_score': 1.0, '_source': {'user': 'someuser', 'group': 'AcessoVpn', 'remip': '192.168.0.1', 'action': 'tunnel-down', 'datetime': '2020-09-29T10:51:47-0300',  'message': '<190>Sep 29 10:53:46 192.168.0.1 action="tunnel-down" tunnelid=862562769 remip=192.168.0.1 user="someuser" group="AcessoVpn"', 'log_type': 'sslvpnteste', 'tunnelid': '862562769'}}]

Where am I going wrong?

I do not see anything wrong. The _source has a bunch of fields in it. Do they not show up if you refresh the index in kibana?

You might want to look at the timeout_task_id_field option on the aggregate filter if you want to keep the tunnelid.

@Badger According to my code I should have an array field called logs wherein action and datetime should be stored as a hash, but this map doesn't appear to be applied.

Indeed. Note that the event from elasticsearch has a message field, but you never add [message] to the map. That is not an aggregated event, it is the event you are trying to aggregate. How that could get to elasticsearch when you have event.cancel in the aggregate code option is beyond me.

Maybe you are not running the configuration you think you are running.

Sorry, but could you explain this part in more detail? I'm not sure I understood what you meant by this. Also, I've already tried running the same code without event.cancel, but there's no difference in the output - also, at least to me, the documentation is not very clear on what it actually does; I was assuming that the original (non-aggregated) event was thrown away and the aggregated event was preserved.

That is exactly what it does. If it is not doing it then event.cancel is not being called.

Remove

map_action => "create"

The filter will return before executing the code if that is present.

1 Like

Can't believe it was that simple! Thank you very much once again, @Badger.