As I am only starting to test with ElasticStack, I have only configured 3 hosts with filebeat installed to forward apache logs to logstash. My grok filter looks like this:
input {
beats {
port => "5044"
}
}
filter {
grok {
match => { "message" =>"%{IP:client_ip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:time_stamp}\] %{QS:web_site} %{NUMBER:server_port} \"%{WORD:request_method} %{URIPATHPARAM:uri_path} HTTP/%{NUMBER:http_version}\" %{NUMBER:response} (?:%{QS:referer}|-) %{QS:user_agent} %{NUMBER:bytes_received} %{NUMBER:bytes_sent}"}
}
geoip {
source => "client_ip"
}
}
output {
if "_grokparsefailure" in [tags] {
file { path => "var/log/logstash/failed/failed_apache_events-%{+YYYY-MM-dd}" }
}
elasticsearch {
hosts => [ "10.1.0.20:9200" ]
}
}
As you can see, I've configured it to output the log to a failed log if it contains _grokparsefailure. There is only one format of log that gets sent to the failed log, but it happens over 20,000 times per day and it looks like this:
67.171.49.122 - - [09/Mar/2017:07:18:45 -0700] "-" 443 "-" 408 "-" "-" 568 137
I have build a filter that correctly matches up each field like so:
%{IP:client_ip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:time_stamp}\] %{QS:web_site} %{NUMBER:server_port} %{QS:request_method} %{NUMBER:response} %{QS:referer} %{QS:user_agent} %{NUMBER:bytes_received} %{NUMBER:bytes_sent}
but I do not know how to say something like, if the log contains _grokparsefailure, then try this filter".
A grok filter can be given multiple expressions. They will be tried in order, first match wins.
match => { "message" => ["expression1", "expression"] }
I marked as solved becuase what I changed based on your answer did work, but I think I did something wrong as now I am getting 4000 logs per minute in elasticsearch rather than my expected 200...
I did this:
input {
beats {
port => "5044"
}
}
filter {
grok {
match => { "message" =>"%{IP:client_ip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:time_stamp}\] %{QS:web_site} %{NUMBER:server_port} \"%{WORD:request_method} %{URIPATHPARAM:uri_path} HTTP/%{NUMBER:http_version}\" %{NUMBER:response} (?:%{QS:referer}|-) %{QS:user_agent} %{NUMBER:bytes_received} %{NUMBER:bytes_sent}"}
match => { "message" =>"%{IP:client_ip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:time_stamp}\] %{QS:web_site} %{NUMBER:server_port} %{QS:request_method} %{NUMBER:response} %{QS:referer} %{QS:user_agent} %{NUMBER:bytes_received} %{NUMBER:bytes_sent}" }
}
geoip {
source => "client_ip"
}
}
output {
elasticsearch {
hosts => [ "10.1.0.20:9200" ]
}
}
Which, re-reading your post, does not look like what you actually suggested. Can you give me a better example of what I would do in my case?
I marked as solved becuase what I changed based on your answer did work, but I think I did something wrong as now I am getting 4000 logs per minute in elasticsearch rather than my expected 200...
Unless you actually have that amount of traffic I can think of two reasons:
- You have a clone filter in your configuration (which I don't think you have).
- You accidentally have multiple copies of your elasticsearch output. Logstash reads all configuration files in /etc/logstash/conf.d (or whatever directory you tell it to read).
Your grok filter setup with two match settings probably works but I prefer using the documented syntax in my previous example.
I don't have any other filters in my conf.d directory. All I know is, at point #1 I added the second "match" statement that was supposed to catch all the logs that had a _grokparsefailure. At point #2 is when I raised the issue on this forum. At point #3 I removed the second "match" statement. Maybe Elasticsearch was going back through all those logs tagged as _grokparsefailure and re-indexing them according to the new filter?
Can you give a more thorough example of your preferred approach using my filter above? I don't quite understand...
I don't have any other filters in my conf.d directory. All I know is, ...
Hmm. Maybe specifying two match options works differently than I thought.
Maybe Elasticsearch was going back through all those logs tagged as _grokparsefailure and re-indexing them according to the new filter?
No, it won't do that by itself.
Can you give a more thorough example of your preferred approach using my filter above? I don't quite understand...
grok {
match => {
"message" => [
"%{IP:client_ip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:time_stamp}\] %{QS:web_site} %{NUMBER:server_port} \"%{WORD:request_method} %{URIPATHPARAM:uri_path} HTTP/%{NUMBER:http_version}\" %{NUMBER:response} (?:%{QS:referer}|-) %{QS:user_agent} %{NUMBER:bytes_received} %{NUMBER:bytes_sent}",
"%{IP:client_ip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:time_stamp}\] %{QS:web_site} %{NUMBER:server_port} %{QS:request_method} %{NUMBER:response} %{QS:referer} %{QS:user_agent} %{NUMBER:bytes_received} %{NUMBER:bytes_sent}"
]
}
}