Multi if condition on filter aggregation

Hi guys.
I hope somebody can help to understand this problem I am having while trying to aggregate a field based on multi if condition.

I have a field named message in which I have multiple events coming to. for example

message
user1
user2
user3
user1
user2
user3

I am trying to make an aggregation base on if the message contains a specific string and visualise the count of that event.

I was able to achieve this with a single if statement as follow:

if [message] =~ "user1" {
      aggregate {
	    task_id => "%{message}"
	    code => "map['message'] ||= 0; map['message'] += 1;"
	    push_map_as_event_on_timeout => true
	    timeout_task_id_field => "message"
	    timeout => 60
	    inactivity_timeout => 50
	    timeout_tags => ['_aggregatetimeout']
	    timeout_code => "event.set('count_message', event.get('message') > 1)"
         }
  }

So far everything worked as expected.

But now I would like to analyse all the events and aggregate based on multi conditions.
so I tried with:

input {
  syslog {
    port => 514
 }
}
filter {
  prune {
    whitelist_names =>["timestamp","message","newfield", "count_message"]
  }
  mutate {
        add_field => {"newfield" => "%{@timestamp}%{message}"}
  }
  if [message] =~ "user1" {
      aggregate {
	    task_id => "%{message}"
	    code => "map['message'] ||= 0; map['message'] += 1;"
	    push_map_as_event_on_timeout => true
	    timeout_task_id_field => "message"
	    timeout => 60
	    inactivity_timeout => 50
	    timeout_tags => ['_aggregatetimeout']
	    timeout_code => "event.set('count_message', event.get('message') > 1)"
      }
  } else if [message] =~ "user2" {
      aggregate {
        task_id => "%{message}"
        code => "map['message'] ||= 0; map['message'] += 1;"
        push_map_as_event_on_timeout => true
        timeout_task_id_field => "message"
        timeout => 60
        inactivity_timeout => 50
        timeout_tags => ['_aggregatetimeout']
        timeout_code => "event.set('count_message', event.get('message') > 1)"
      }
  } 
}
output {
  elasticsearch {
     hosts => ["localhost:9200"]
         index => "logstash_index"
 }
  stdout {
    codec => rubydebug
 }
}

but when I try to run my configuration, I got this error

[2021-07-23T14:57:31,692][ERROR][logstash.javapipeline    ][main] Pipeline error {:pipeline_id=>"main", :exception=>#<LogStash::ConfigurationError: Aggregate plugin: For task_id pattern '%{message}', there are more than one filter which defines timeout options. All timeout options have to be defined in only one aggregate filter per task_id pattern. Timeout options are : timeout, inactivity_timeout, timeout_code, push_map_as_event_on_timeout, push_previous_map_as_event, timeout_timestamp_field, timeout_task_id_field, timeout_tags>, :backtrace=>["C:/logstash-7.13.0/vendor/bundle/jruby/2.5.0/gems/logstash-filter-aggregate-2.9.1/lib/logstash/filters/aggregate.rb:103:in `block in register'", "org/jruby/ext/thread/Mutex.java:164:in `synchronize'", "C:/logstash-7.13.0/vendor/bundle/jruby/2.5.0/gems/logstash-filter-aggregate-2.9.1/lib/logstash/filters/aggregate.rb:97:in `register'", "org/logstash/config/ir/compiler/AbstractFilterDelegatorExt.java:75:in `register'", "C:/logstash-7.13.0/logstash-core/lib/logstash/java_pipeline.rb:228:in `block in register_plugins'", "org/jruby/RubyArray.java:1809:in `each'", "C:/logstash-7.13.0/logstash-core/lib/logstash/java_pipeline.rb:227:in `register_plugins'", "C:/logstash-7.13.0/logstash-core/lib/logstash/java_pipeline.rb:586:in `maybe_setup_out_plugins'", "C:/logstash-7.13.0/logstash-core/lib/logstash/java_pipeline.rb:240:in `start_workers'", "C:/logstash-7.13.0/logstash-core/lib/logstash/java_pipeline.rb:185:in `run'", "C:/logstash-7.13.0/logstash-core/lib/logstash/java_pipeline.rb:137:in `block in start'"], "pipeline.sources"=>["C:/logstash-7.13.0/bin/elastic.conf"], :thread=>"#<Thread:0x30556acc run>"}

I do understand that this approach won't work due to each batch can only run one aggregate.

But I was wondering and if somebody could help to understand how I can check a multi condition on a specific task_id?

Thank you so much everyone

If your messages really just contain user1 or user2 then you do not need any conditionals, you can just use the same aggregate filter for all of them. If you want to count the number of messages that contain strings like that then you would need to extract that into another field. Possibly using

grok { match => { "message" => "user%{POSINT:[@metadata][userid]}" } }

and then use

task_id => "[@metadata][userid]"

Again, using a single aggregate filter.

Thank you very much for your reply, I do really appreciate it.
The example I provided above was just about a testing environment. The real logs, are a bit difference. The real logs might look like this

message
The user  <username> has succesfuly logged in
The user <username> logged out
Email send failed

And what I was trying to achieve is to check every single message for message that contains Logged in, logged out or email send and aggregate those message into one.

Could you please advice me on the best approach to achieve this?

As I said before, you should grok the username out of the message, possibly using

grok { match => { "message" => "user %{WORD:[@metadata][userid]}" } }

and then aggregate on that.

Sorry I know this will sound dumb and I do apology if I am slow to understand this, but I am totally new to logstash.
I do see the point of the grok to filter the messages, but how can I set multiple matches if I need to?
another question

grok { match => { "message" => "user %{WORD:[@metadata][userid]}" } }

the WORD: ... can I pass a string to this to look for a specific string if its contained in the message?

I am sorry to keep asking

It's not easy to answer when the question is unclear. It looks like you are trying to count messages. Are you trying to count the number of "logged in" and "logged out" and "email send" with separate counters? Or are you trying to count the number of messages for "username". I assumed the latter. If the former the solution will be a little different.

I am so sorry if my question is unclear but I really appreciate your patience.

Let me make an example to fully explain my case.

I will use your name for the example.

I have this logs

timestamp.      message                                         userId
19:05:45.        User Badger(0001) has logged out.        0001
19:06:05.        Use Badger(0001) logged in.                    0001
19:08:23.        workstation locked

Lets say I receive 100s of those logs.

What I want to do, is to check the message and if the message contains User and UserId and it happened tot time in a time range, to increase the count of that event so I will now how many times that event occurred in the time range, and to do the aggregation, the userId must be present in the message(and this is the case with the grok you mentioned)

Now the timeout_timestamp_field it works just fine and it does the correct aggregations. But what I want to do, is have multiple aggregation based on the specific events like logged in, logged out workstation locked etc etc.

I hope I made my self clear enough this time, and please if you have more question just ask me

A regular expression can be a fixed string. If you want to classify the message based on whether it contains one of several strings then you could try

grok {
    match => {
        "message" => [
            "(?<[@metadata][event_type]>logged out)",
            "(?<[@metadata][event_type]>logged in)",
            "(?<[@metadata][event_type]>workstation locked)"
        ]
    }
}

Hello mate.
thank you so much for your solution, I am able to aggregate based on the grok match.
I do have just one more concern if please can you clarify it for me.

I updated my configurations with grok as follow:

filter {
  grok {
    match => {
        "message" => [
	    "(?<[@metadata][event_type]>logged out)",
            "(?<[@metadata][event_type]>logged in)",
            "(?<[@metadata][event_type]>workstation locked)"

   ]
 }
}
    aggregate {
	    task_id => "%{message}"
	    code => "
		map['message_count'] ||= 0; map['message_count'] += 1;
		
		"
	    push_map_as_event_on_timeout => true
	    timeout_timestamp_field => "@timestamp"
	    timeout => 60
	    inactivity_timeout => 50
	    timeout_tags => ['_aggregatetimeout']
           
	    
      }
}

This makes the aggregation on ALL the entries. Which mean if I generate a log with message == MESSAGE

This also get aggregated.

How can I can restrict the aggregation only to whatever I declare in the grok?

 match => {
        "message" => [
	    "(?<[@metadata][event_type]>logged out)",
            "(?<[@metadata][event_type]>logged in)",
            "(?<[@metadata][event_type]>workstation locked)"

   ]
 }
}

Thank you so much for your time once again

If you want to aggregate based on the event_type then use that as the task_id

task_id => "%{[@metadata][event_type]}"

If you want a sample message included in the generated event then you will have to add that to the map

map['message'] ||= event.get('message')

Sir badger, Thank you very much for your help. That worked amazingly. I really don't know how to thank you for your patience with me. Thank you so so much

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.