Grok filter not working on logstash

For my below filebeat config im not able to filter the logs based on tags in logstash

Filebeat Config

filebeat.inputs:
- type: log
  enabled: true
  fields:
    env: xyz-production
    app_name: xyz
    tags: ["nginx"]
  paths:
    - /var/log/nginx/access.log
filebeat.config.modules:
  path: ${path.config}/modules.d/*.yml
  reload.enabled: false
setup.template.settings:
  index.number_of_shards: 1
output.logstash:
  hosts: ["xxx.xxx.xxx.xx:5044"]
processors:
  - add_host_metadata: ~
  - add_cloud_metadata: ~

Logstash Config

   input {
  beats {
    client_inactivity_timeout => 3000
    port => 5044
    ssl  => false
  }
}

filter {
  if "nginx" in [tags]{
      grok {
        match => { "message"=> '%{IP:Client_IP} %{NOTSPACE:Termination_State} %{NOTSPACE:Termination_state} \[%{HTTPDATE:timestamp}\] "%{WORD:Method} %{URIPATHPARAM:Http_Referer} HTTP/%{NUMBER:Http_Version}" %{INT:HTTP_STATUS_CODE} %{NOTSPACE:bytes_read} "%{DATA:Request_Url}"%{GREEDYDATA:Agent_Detail}"'}
      }
 }
    else if [fileset][name] == "error" {
      grok {
        match => { "message" => ["%{DATA:[nginx][error][time]} \[%{DATA:[nginx][error][level]}\] %{NUMBER:[nginx][error][pid]}#%{NUMBER:[nginx][error][tid]}: (\*%{NUMBER:[nginx][error][connection_id]} )?%{GREEDYDATA:[nginx][error][message]}"] }
        remove_field => "message"
      }
      mutate {
        rename => { "@timestamp" => "read_timestamp" }
      }
      date {
        match => [ "[nginx][error][time]", "YYYY/MM/dd H:m:s" ]
        remove_field => "[nginx][error][time]"
      }
    }

Here the logs is not getting filter based on the grok filter applied.Please help me figure out this

Any Update on this guys

I think your tag either doesn't exist or it exists and the grok pattern doesn't match. It's always best to post the actual data, so that people have a chance to help you. What does the ruby debug output look like without any filters applied?

@Jenni

Here is my logtsash file

input {
  beats {
    client_inactivity_timeout => 3000
    port => 5044
    ssl  => false
  }
}
filter {
  if [fileset][module] == "nginx" {
    if [fileset][name] == "access" {
      grok {
        match => { "message" => '%{IP:Client_IP} %{NOTSPACE:Termination_State} %{NOTSPACE:Termination_state} \[%{HTTPDATE:timestamp}\] "%{WORD:Method} %{URIPATHPARAM:Http_Referer} HTTP/%{NUMBER:Http_Version}" %{INT:HTTP_STATUS_CODE} %{NOTSPACE:bytes_read} "%{DATA:Request_Url}"%{GREEDYDATA:Agent_Detail}"' }

                remove_field => "message"
      }
      mutate {
        add_field => { "read_timestamp" => "%{@timestamp}" }
      }
      date {
        match => [ "[nginx][access][time]", "dd/MMM/YYYY:H:m:s Z" ]
        remove_field => "[nginx][access][time]"
      }
      useragent {
        source => "[nginx][access][agent]"
        target => "[nginx][access][user_agent]"
        remove_field => "[nginx][access][agent]"
      }
      geoip {
        source => "[nginx][access][remote_ip]"
        target => "[nginx][access][geoip]"
      }
    }
    else if [fileset][name] == "error" {
      grok {
        match => { "message" => ["%{DATA:[nginx][error][time]} \[%{DATA:[nginx][error][level]}\] %{NUMBER:[nginx][error][pid]}#%{NUMBER:[nginx][error][tid]}: (\*%{NUMBER:[nginx][error][connection_id]} )?%{GREEDYDATA:[nginx][error][message]}"] }
        remove_field => "message"
      }
      mutate {
        rename => { "@timestamp" => "read_timestamp" }
      }
      date {
        match => [ "[nginx][error][time]", "YYYY/MM/dd H:m:s" ]
        remove_field => "[nginx][error][time]"
      }
    }
  }  else if [fileset][module] == "system" {
    if [fileset][name] == "auth" {
      grok {
        match => { "message" => ["%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: %{DATA:[system][auth][ssh][event]} %{DATA:[system][auth][ssh][method]} for (invalid user )?%{DATA:[system][auth][user]} from %{IPORHOST:[system][auth][ssh][ip]} port %{NUMBER:[system][auth][ssh][port]} ssh2(: %{GREEDYDATA:[system][auth][ssh][signature]})?",                  "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: %{DATA:[system][auth][ssh][event]} user %{DATA:[system][auth][user]} from %{IPORHOST:[system][auth][ssh][ip]}",                  "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: Did not receive identification string from %{IPORHOST:[system][auth][ssh][dropped_ip]}",
                  "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sudo(?:\[%{POSINT:[system][auth][pid]}\])?: \s*%{DATA:[system][auth][user]} :( %{DATA:[system][auth][sudo][error]} ;)? TTY=%{DATA:[system][auth][sudo][tty]} ; PWD=%{DATA:[system][auth][sudo][pwd]} ; USER=%{DATA:[system][auth][sudo][user]} ; COMMAND=%{GREEDYDATA:[system][auth][sudo][command]}",                  "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} groupadd(?:\[%{POSINT:[system][auth][pid]}\])?: new group: name=%{DATA:system.auth.groupadd.name}, GID=%{NUMBER:system.auth.groupadd.gid}",
                  "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} useradd(?:\[%{POSINT:[system][auth][pid]}\])?: new user: name=%{DATA:[system][auth][user][add][name]}, UID=%{NUMBER:[system][auth][user][add][uid]}, GID=%{NUMBER:[system][auth][user][add][gid]}, home=%{DATA:[system][auth][user][add][home]}, shell=%{DATA:[system][auth][user][add][shell]}$",                  "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} %{DATA:[system][auth][program]}(?:\[%{POSINT:[system][auth][pid]}\])?: %{GREEDYMULTILINE:[system][auth][message]}"] }
        pattern_definitions => {
          "GREEDYMULTILINE"=> "(.|\n)*"        }
        remove_field => "message"
      }
      date {
        match => [ "[system][auth][timestamp]", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
      }
      geoip {
        source => "[system][auth][ssh][ip]"
        target => "[system][auth][ssh][geoip]"
      }
    }
    else if [fileset][name] == "syslog" {
      grok {
        match => { "message" => ["%{SYSLOGTIMESTAMP:[system][syslog][timestamp]} %{SYSLOGHOST:[system][syslog][hostname]} %{DATA:[system][syslog][program]}(?:\[%{POSINT:[system][syslog][pid]}\])?: %{GREEDYMULTILINE:[system][syslog][message]}"] }
        pattern_definitions => { "GREEDYMULTILINE" => "(.|\n)*" }
        remove_field => "message"
      }
      date {
        match => [ "[system][syslog][timestamp]", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
      }    }
  }
  else if [fields][app_name] == "core" {
    grok {
      match => { "message" => "\[%{DATA:timestamp}\][ \t]*(\[%{DATA:log_level}\])?[ \t]*(\[%{DATA:request_id}\])?[ \t]*(%{DATA:class_name}:)?[ \t]*(%{GREEDYDATA:message})?" }
      overwrite => ["message"]
    }
  }
}

output {
  elasticsearch {
    hosts => ["XXX.XXX.XX.XX:9200"]
    manage_template => false
    index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
    user => elastic
    password => "passwd"

Here is my filebeat file

filebeat.inputs:
- type: log
  enabled: true
  fields:
    env: qa
    app_name: core_frontend
  paths:
    - /var/log/*.log
    - /var/log/nginx/access.log
filebeat.config.modules:
  path: ${path.config}/modules.d/*.yml
  reload.enabled: false
setup.template.settings:
  index.number_of_shards: 1
output.logstash:
  hosts: ["XXX.XXX.XX.X:5044"]
processors:
  - add_host_metadata: ~
  - add_cloud_metadata: ~

Here my nginx logs are not getting parsed with grok filter which i have used in the logstash.Where as have verified the pattern grokdebug

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.