Hi guys.
I hope somebody can help to understand this problem I am having while trying to aggregate a field based on multi if condition.
I have a field named message
in which I have multiple events coming to. for example
message
user1
user2
user3
user1
user2
user3
I am trying to make an aggregation base on if the message contains a specific string and visualise the count of that event.
I was able to achieve this with a single if statement as follow:
if [message] =~ "user1" {
aggregate {
task_id => "%{message}"
code => "map['message'] ||= 0; map['message'] += 1;"
push_map_as_event_on_timeout => true
timeout_task_id_field => "message"
timeout => 60
inactivity_timeout => 50
timeout_tags => ['_aggregatetimeout']
timeout_code => "event.set('count_message', event.get('message') > 1)"
}
}
So far everything worked as expected.
But now I would like to analyse all the events and aggregate based on multi conditions.
so I tried with:
input {
syslog {
port => 514
}
}
filter {
prune {
whitelist_names =>["timestamp","message","newfield", "count_message"]
}
mutate {
add_field => {"newfield" => "%{@timestamp}%{message}"}
}
if [message] =~ "user1" {
aggregate {
task_id => "%{message}"
code => "map['message'] ||= 0; map['message'] += 1;"
push_map_as_event_on_timeout => true
timeout_task_id_field => "message"
timeout => 60
inactivity_timeout => 50
timeout_tags => ['_aggregatetimeout']
timeout_code => "event.set('count_message', event.get('message') > 1)"
}
} else if [message] =~ "user2" {
aggregate {
task_id => "%{message}"
code => "map['message'] ||= 0; map['message'] += 1;"
push_map_as_event_on_timeout => true
timeout_task_id_field => "message"
timeout => 60
inactivity_timeout => 50
timeout_tags => ['_aggregatetimeout']
timeout_code => "event.set('count_message', event.get('message') > 1)"
}
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "logstash_index"
}
stdout {
codec => rubydebug
}
}
but when I try to run my configuration, I got this error
[2021-07-23T14:57:31,692][ERROR][logstash.javapipeline ][main] Pipeline error {:pipeline_id=>"main", :exception=>#<LogStash::ConfigurationError: Aggregate plugin: For task_id pattern '%{message}', there are more than one filter which defines timeout options. All timeout options have to be defined in only one aggregate filter per task_id pattern. Timeout options are : timeout, inactivity_timeout, timeout_code, push_map_as_event_on_timeout, push_previous_map_as_event, timeout_timestamp_field, timeout_task_id_field, timeout_tags>, :backtrace=>["C:/logstash-7.13.0/vendor/bundle/jruby/2.5.0/gems/logstash-filter-aggregate-2.9.1/lib/logstash/filters/aggregate.rb:103:in `block in register'", "org/jruby/ext/thread/Mutex.java:164:in `synchronize'", "C:/logstash-7.13.0/vendor/bundle/jruby/2.5.0/gems/logstash-filter-aggregate-2.9.1/lib/logstash/filters/aggregate.rb:97:in `register'", "org/logstash/config/ir/compiler/AbstractFilterDelegatorExt.java:75:in `register'", "C:/logstash-7.13.0/logstash-core/lib/logstash/java_pipeline.rb:228:in `block in register_plugins'", "org/jruby/RubyArray.java:1809:in `each'", "C:/logstash-7.13.0/logstash-core/lib/logstash/java_pipeline.rb:227:in `register_plugins'", "C:/logstash-7.13.0/logstash-core/lib/logstash/java_pipeline.rb:586:in `maybe_setup_out_plugins'", "C:/logstash-7.13.0/logstash-core/lib/logstash/java_pipeline.rb:240:in `start_workers'", "C:/logstash-7.13.0/logstash-core/lib/logstash/java_pipeline.rb:185:in `run'", "C:/logstash-7.13.0/logstash-core/lib/logstash/java_pipeline.rb:137:in `block in start'"], "pipeline.sources"=>["C:/logstash-7.13.0/bin/elastic.conf"], :thread=>"#<Thread:0x30556acc run>"}
I do understand that this approach won't work due to each batch can only run one aggregate.
But I was wondering and if somebody could help to understand how I can check a multi condition on a specific task_id?
Thank you so much everyone