Filter with select in logstash

Hi Community,
I would like to have a filter to dissect my logs and aggregate them like this

SELECT COUNT(CustomerID), Country
FROM Customers
GROUP BY Country;

Could you please help me?

=> in the logname are countries for example like: user 123 in Germany logelevel: Info /path...

       filter {
dissect {mapping => {"message" => "[%{logdate}] %{application}.%{loglevel}: %{logname} /%{path}"}}
       
 if "info" in [loglevel] {
        
          aggregate {
               task_id => "%{logname}"
               code => "
                 map['logname'] ||= 0; map['logname'] += 1;
                 map['logname'] ||= event.get('logname')
                 map['loglevel'] ||= []
                 map['loglevel'] << {'loglevel' => event.get('loglevel')}
                 event.cancel()
               "
               push_previous_map_as_event => true
               timeout => 3
             }
         }else{
         dissect { mapping => { "message" => "[%{logdate}] %{exception}.%{errorlogname}.%{errorCode}.%{loglevel}: %{errorMessage} /%{path}" } }
        }

Why don't do it in Elasticsearch?

What does the data you are trying to aggregate look like?

[2020-11-02 12:44:04] application.INFO: Today is beautiful weather /path/....
[2020-11-02 12:44:05] application.INFO: Today is beautiful weather /path/....
[2020-11-02 12:44:06] application.INFO: today rainbow /path/....
[2020-11-02 12:44:06] Chttpexception.ERROR: 34 rainy day /path/....

How :slight_smile:

please help :grimacing:

You could try

    dissect {mapping => {"message" => "[%{logdate}] %{application}.%{loglevel}: %{}"}}
    if "INFO" in [loglevel] {
        dissect {mapping => {"message" => "[%{logdate}] %{application}.%{loglevel}: %{logname} /%{path}"}}
        aggregate {
           task_id => "%{logname}"
           code => "
             map['logname'] ||= 0; map['logname'] += 1;
             map['logname'] ||= event.get('logname')
             map['loglevel'] ||= []
             map['loglevel'] << {'loglevel' => event.get('loglevel')}
             event.cancel()
           "
           push_previous_map_as_event => true
           timeout => 3
         }
    }else{
        dissect { mapping => { "message" => "[%{logdate}] %{application}.%{loglevel}: %{errorCode} %{errorMessage} /%{path}" } }
    }

Note that the

map['logname'] ||= event.get('logname')

will never perform the assignment, because the previous line has ensured that map['logname'] is never nil.

thank you very much, Ill try it and let you know !! :star_struck:
I see so .. :face_with_monocle:

Hello,

i tried but the problem is, that the logstash is not going in to the if condition.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.