Logstash aggregate field and increase count

Hello everyone.
I am again here. I would like to start thanking every single one one of you for this amazing community.

Those days I was testing elasticsearch and logstash and I am just falling in love with how many things I can achieve with those 2 tools, it is impressive.

In my testing environment using elasticsearch, logstash and grafana, I am trying to aggregate similar fields in a specific time range to save in disk space and optimise data visualisation. To explain myself better ill will give an example.

Currently I have some junk syslog generated by kiwi syslog generator. The fields are

timestamp
message

what I want to do, is if I have 2 identical messages generated in the last 10min, to group them in 1 line and add a column count that reflects how many time that message occurred in the last 10min.

example:

before:

timestamp.         message
13:54:24.               hello
13:54:35.               hello

after:

timestamp.         message.      count
13.54.35.               hello.              2

I checked the documentation and I see logstash offers the aggregate filter plugin, but I was wondering if there is an option to specify a timespan value in which those event occurs.

Thank you very much for your time

EDIT:
I went through the documentation to implement the timeout aggregation as follow:

input {
  syslog {
    port => 514
 }
}
filter {
  prune {
    whitelist_names =>["timestamp","message","newfield", "count_message"]
  }
  mutate {
        add_field => {"newfield" => "%{@timestamp}%{message}"}
  }
  if [message] =~ "MESSAGE" {
      aggregate {
	    task_id => "%{message}"
	    code => "map['message'] ||= 0; map['message'] += 1;"
	    push_map_as_event_on_timeout => true
	    timeout_task_id_field => "message"
	    timeout => 60
	    inactivity_timeout => 50
	    timeout_tags => ['_aggregatetimeout']
	    timeout_code => "event.set('count_message', event.get('message') > 1)"
         }
    }
}
output {
  elasticsearch {
     hosts => ["localhost:9200"]
         index => "logstash_index"
 }
  stdout {
    codec => rubydebug
 }
}

The output is similar to what I am expecting but not 100% correct.
The actual output, duplicate every rows, adding the a tags _aggregation to it.

example:

if I have those 3 logs:

timestamp.         message
13:54:24.           MESSAGE
13:54:35.           MESSAGE
13:54:40.           ESSAGE

as a result I am getting

timestamp.         message.          tags
13:55:24.           MESSAGE.           _aggregationtimeout
13:55:24.           MESSAGE.           _aggregationtimeout
13:55:24.           MESSAGE.           _aggregationtimeout

13:54:24.           MESSAGE.         
13:54:35.           MESSAGE
13:54:40.           MESSAGE

Can please anyone help to understand how I can get the count of duplicate events in a specific time range?

The aggregate filter does not change the events that pass through it (unless you do so in the code option) so the original event will all get indexed unless you add event.cancel to the code option.

When the timeout occurs the aggregate filter will generate one event for each entry in the map. It will not generate three. It seems likely that you have redacted out the cause of the issue.

The timeout_timestamp_field option might be relevant.

Yes you are right. Was my mistake because I was checking the wrong column.
The approach mentioned above did work, but I have just a question about this.

When the aggregation occurs after the timeout, I have the correct count in the message field. like this

message
6

I was wondering if there is a way where I can print the count in a separate field and keep the message aggregated in the message so I know to which one the count aggregation refers.

Thank you very much for your patience

Sorry this is the configuration updated

input {
  syslog {
    port => 514
 }
}
filter {
  prune {
    whitelist_names =>["timestamp","message","newfield", "event_count"]
  }
  mutate {
        add_field => {"newfield" => "%{@timestamp}%{message}"}
  }
  if [message] =~ "MESSAGE" {
      aggregate {
	    task_id => "%{message}"
	    code => "map['message'] ||= 0; map['message'] += 1;"
	    push_map_as_event_on_timeout => true
	    timeout_timestamp_field => "@timestamp"
	    timeout => 60
	    inactivity_timeout => 50
	    timeout_tags => ['_aggregatetimeout']
	    timeout_code => "event.set('event_count', event.get('message') > 1)"
      }
  }
}
output {
  elasticsearch {
     hosts => ["localhost:9200"]
         index => "logstash_index"
 }
  stdout {
    codec => rubydebug
 }
}

The way you are using timeout_task_id_field means [message] will be preserved. If you want the count in a different field then remove the timeout_code option and change the code option to be

code => '
    map["message_count"] ||= 0
    map["message_count"] += 1
'

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.