Filter with select in logstash

Hi Community,
I would like to have a filter to dissect my logs and aggregate them like this

SELECT COUNT(CustomerID), Country
FROM Customers
GROUP BY Country;

Could you please help me?

=> in the logname are countries for example like: user 123 in Germany logelevel: Info /path...

       filter {
dissect {mapping => {"message" => "[%{logdate}] %{application}.%{loglevel}: %{logname} /%{path}"}}
       
 if "info" in [loglevel] {
        
          aggregate {
               task_id => "%{logname}"
               code => "
                 map['logname'] ||= 0; map['logname'] += 1;
                 map['logname'] ||= event.get('logname')
                 map['loglevel'] ||= []
                 map['loglevel'] << {'loglevel' => event.get('loglevel')}
                 event.cancel()
               "
               push_previous_map_as_event => true
               timeout => 3
             }
         }else{
         dissect { mapping => { "message" => "[%{logdate}] %{exception}.%{errorlogname}.%{errorCode}.%{loglevel}: %{errorMessage} /%{path}" } }
        }

Why don't do it in Elasticsearch?

What does the data you are trying to aggregate look like?

[2020-11-02 12:44:04] application.INFO: Today is beautiful weather /path/....
[2020-11-02 12:44:05] application.INFO: Today is beautiful weather /path/....
[2020-11-02 12:44:06] application.INFO: today rainbow /path/....
[2020-11-02 12:44:06] Chttpexception.ERROR: 34 rainy day /path/....

How :slight_smile:

please help :grimacing:

You could try

    dissect {mapping => {"message" => "[%{logdate}] %{application}.%{loglevel}: %{}"}}
    if "INFO" in [loglevel] {
        dissect {mapping => {"message" => "[%{logdate}] %{application}.%{loglevel}: %{logname} /%{path}"}}
        aggregate {
           task_id => "%{logname}"
           code => "
             map['logname'] ||= 0; map['logname'] += 1;
             map['logname'] ||= event.get('logname')
             map['loglevel'] ||= []
             map['loglevel'] << {'loglevel' => event.get('loglevel')}
             event.cancel()
           "
           push_previous_map_as_event => true
           timeout => 3
         }
    }else{
        dissect { mapping => { "message" => "[%{logdate}] %{application}.%{loglevel}: %{errorCode} %{errorMessage} /%{path}" } }
    }

Note that the

map['logname'] ||= event.get('logname')

will never perform the assignment, because the previous line has ensured that map['logname'] is never nil.

thank you very much, Ill try it and let you know !! :star_struck:
I see so .. :face_with_monocle:

Hello,

i tried but the problem is, that the logstash is not going in to the if condition.