Need help configuring my grok filters

As I am only starting to test with ElasticStack, I have only configured 3 hosts with filebeat installed to forward apache logs to logstash. My grok filter looks like this:

input {
    beats {
        port => "5044"
    }
}
filter {
    grok {
        match => { "message" =>"%{IP:client_ip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:time_stamp}\] %{QS:web_site} %{NUMBER:server_port} \"%{WORD:request_method} %{URIPATHPARAM:uri_path} HTTP/%{NUMBER:http_version}\" %{NUMBER:response} (?:%{QS:referer}|-) %{QS:user_agent} %{NUMBER:bytes_received} %{NUMBER:bytes_sent}"}
    }
    geoip {
        source => "client_ip"
    }
}
output {
    if "_grokparsefailure" in [tags] {
       file { path => "var/log/logstash/failed/failed_apache_events-%{+YYYY-MM-dd}" }
    }
    elasticsearch {
        hosts => [ "10.1.0.20:9200" ]
    }
}

As you can see, I've configured it to output the log to a failed log if it contains _grokparsefailure. There is only one format of log that gets sent to the failed log, but it happens over 20,000 times per day and it looks like this:

67.171.49.122 - - [09/Mar/2017:07:18:45 -0700] "-" 443 "-" 408 "-" "-" 568 137

I have build a filter that correctly matches up each field like so:

%{IP:client_ip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:time_stamp}\] %{QS:web_site} %{NUMBER:server_port} %{QS:request_method} %{NUMBER:response} %{QS:referer} %{QS:user_agent} %{NUMBER:bytes_received} %{NUMBER:bytes_sent}

but I do not know how to say something like, if the log contains _grokparsefailure, then try this filter".

A grok filter can be given multiple expressions. They will be tried in order, first match wins.

match => { "message" => ["expression1", "expression"] }

I marked as solved becuase what I changed based on your answer did work, but I think I did something wrong as now I am getting 4000 logs per minute in elasticsearch rather than my expected 200...

I did this:

input {
    beats {
        port => "5044"
    }
}
filter {
    grok {
        match => { "message" =>"%{IP:client_ip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:time_stamp}\] %{QS:web_site} %{NUMBER:server_port} \"%{WORD:request_method} %{URIPATHPARAM:uri_path} HTTP/%{NUMBER:http_version}\" %{NUMBER:response} (?:%{QS:referer}|-) %{QS:user_agent} %{NUMBER:bytes_received} %{NUMBER:bytes_sent}"}
        match => { "message" =>"%{IP:client_ip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:time_stamp}\] %{QS:web_site} %{NUMBER:server_port} %{QS:request_method} %{NUMBER:response} %{QS:referer} %{QS:user_agent} %{NUMBER:bytes_received} %{NUMBER:bytes_sent}" }
    }
    geoip {
        source => "client_ip"
    }
}
output {
    elasticsearch {
        hosts => [ "10.1.0.20:9200" ]
    }
}

Which, re-reading your post, does not look like what you actually suggested. Can you give me a better example of what I would do in my case?

I marked as solved becuase what I changed based on your answer did work, but I think I did something wrong as now I am getting 4000 logs per minute in elasticsearch rather than my expected 200...

Unless you actually have that amount of traffic I can think of two reasons:

  • You have a clone filter in your configuration (which I don't think you have).
  • You accidentally have multiple copies of your elasticsearch output. Logstash reads all configuration files in /etc/logstash/conf.d (or whatever directory you tell it to read).

Your grok filter setup with two match settings probably works but I prefer using the documented syntax in my previous example.

I don't have any other filters in my conf.d directory. All I know is, at point #1 I added the second "match" statement that was supposed to catch all the logs that had a _grokparsefailure. At point #2 is when I raised the issue on this forum. At point #3 I removed the second "match" statement. Maybe Elasticsearch was going back through all those logs tagged as _grokparsefailure and re-indexing them according to the new filter?

Can you give a more thorough example of your preferred approach using my filter above? I don't quite understand...

I don't have any other filters in my conf.d directory. All I know is, ...

Hmm. Maybe specifying two match options works differently than I thought.

Maybe Elasticsearch was going back through all those logs tagged as _grokparsefailure and re-indexing them according to the new filter?

No, it won't do that by itself.

Can you give a more thorough example of your preferred approach using my filter above? I don't quite understand...

grok {
  match => {
    "message" => [
      "%{IP:client_ip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:time_stamp}\] %{QS:web_site} %{NUMBER:server_port} \"%{WORD:request_method} %{URIPATHPARAM:uri_path} HTTP/%{NUMBER:http_version}\" %{NUMBER:response} (?:%{QS:referer}|-) %{QS:user_agent} %{NUMBER:bytes_received} %{NUMBER:bytes_sent}",
      "%{IP:client_ip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:time_stamp}\] %{QS:web_site} %{NUMBER:server_port} %{QS:request_method} %{NUMBER:response} %{QS:referer} %{QS:user_agent} %{NUMBER:bytes_received} %{NUMBER:bytes_sent}"
    ]
  }
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.