Strange log parsing

Hello,
I am in the process of learning logstash for parsing particular logs, and for some reason not all of my logs are parsed as they should. I would be very grateful for the help with getting what is done wrong.

With a code like this:


input {
    tcp {
        host => "..."
        port => 5001
        type => "syslog"
    }
    udp {
        host => "..."
        port => 5001
        type => "syslog"
    }
 }

 filter {
     if [type] == "syslog" {
         grok {
             break_on_match => false
             match => [ "message" , "<%{NONNEGINT:syslog_pri}> %{SYSLOGTIMESTAMP:syslog_timestamp}  %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\]) ?: %{GREEDYDATA:syslog_message}"]
             add_field => [ "received_at", "%{@timestamp}" ]
             add_field => [ "received_from", "%{host}" ]
         }
     }

     syslog_pri { }
     if "_grokparsefailure" in [tags] {
         mutate {
         add_tag => "cannot_be_parsed_by_main_filter"
         }
     }

     mutate {
         rename => { "syslog_message" => "message" }
     }
     date {
         match => ["syslog_timestamp", "ISO8601","MMM d HH:mm:ss","MMM d HH:mm:ss"]
         remove_field => ["syslog_timestamp"]
     }
 }

 filter {
     if "cannot_be_parsed_by_main_filter" in [tags] {
         grok{
             remove_tag => ["_grokparsefailure"]
             match => {"message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program} ?: %{GREEDYDATA:syslog_message}"}
             add_field => [ "received_at", "%{@timestamp}" ]
             add_field => [ "received_from", "%{host}" ]
         }
     }

     if "_grokparsefailure" in [tags] {
         mutate {
             remove_tag => ["cannot_be_parsed_by_main_filter"]
             remove_tag => ["_grokparsefailure"]
             add_tag => ["cannot_be_parsed_by_added_filter_1"]
         }
     }

     mutate {
         rename => { "syslog_message" => "message" }
     }

 }

 filter {
     if "cannot_be_parsed_by_added_filter_1" in [tags] {
         grok {
             remove_tag => ["cannot_be_parsed_by_added_filter_1"]
             match => {"message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}\(/etc/cron\.hourly\)\[%{POSINT:syslog_pid}] ?: %{GREEDYDATA:syslog_message}"}
             add_field => [ "received_at", "%{@timestamp}" ]
             add_field => [ "received_from", "%{host}" ]
         }
     }

     if "_grokparsefailure" in [tags] {
         mutate {
             remove_tag => ["cannot_be_parsed_by_added_filter_1"]
             remove_tag => ["_grokparsefailure"]
             add_tag => ["cannot_be_parsed_by_added_filter_2"]
         } 
     }

     mutate {
         rename => { "syslog_message" => "message" }
     }

 }

 output {
     elasticsearch {
         hosts => [ "..." ]
         index => 'logstash-%{+YYYY.MM.dd}'
     }
}

Second additional filter doesn't work and most logs are not parsed my main one for some reason.
I also get only user and notice in facility and severity.

Log samples:

1). For some reason those logs aren't parsed by the main one, but with the first additional one:

    Apr 25 20:11:46 nkdk22 slurmd[1234]: Launching batch job 3540 for UID 12345
    Apr 25 20:15:12 head-testing xcat[12345]: xCAT: Allowing lsdef -t osimage for admin from localhost

2). Logs like this get parsed sometimes by the main filter normally, and sometimes by the first additional filter, and I don't get why or how the difference is made within the program:

   Apr 25 20:19:25 n22p123 OpenSM[1234]: Errors during initialization

3). Logs like this don't get parsed by second additional filter. I'm not sure that second additional filter is reachable, but I don't know the reason:

  Apr 25 15:20:31 nkdk12 run-parts(/etc/cron.hourly)[1234 finished mcelog.cron

Thank you for your help!

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.