G'day!
I'm new to ELK - please forgive me if it's something obvious.
I'd like filebeat to ship two different type of logs to logstash, logstash to process it with different grok filters and output to elasticsearch in two different indices.
The problem I'm having is that filebeat ships all logs, logstash receives them, but only one index is getting created, i.e. app-catalina-log-
My filebeat inputs are as follows:
filebeat.inputs:
- type: log
paths:- /var/log/tomcat/*.log
fields:
app: app
log_type: catalina-log
multiline.pattern: '^[[:space:]]'
multiline.pattern: '^([Aa]lert|ALERT|[Tt]race|TRACE|[Dd]ebug|DEBUG|[Nn]otice|NOTICE|[Ii]nfo|INFO|[Ww]arn?(?:ing)?|WARN?(?:ING)?|[Ee]rr?(?:or)?|ERR?(?:OR)?|[Cc]rit?(?:ical)?|CRIT?(?:ICAL)?|[Ff]atal|FATAL|[Ss]evere|SEVERE|EMERG(?:ENCY)?|[Ee]merg(?:ency)?):'
multiline.negate: false
multiline.match: after
- /var/log/tomcat/*.log
- type: log
enabled: true
paths:- /var/log/tomcat/*.txt
fields:
app: app
log_type: tomcat-access-log
output.logstash:
hosts: [":5044"]
...
- /var/log/tomcat/*.txt
My logstash config:
input {
beats {
port => 5044
}
}
filter {
if ([fields][app] == "mapleta") {
if ([fields][log_type] == "catalina-log") {
grok {
match => [ "message", "%{CATALINA_DATESTAMP:timestamp} %{JAVACLASS:class} %{NOTSPACE:methodName}\n%{LOGLEVEL:logLevel}: %{GREEDYDATA:logMessage}" ]
remove_field => [ "message" ]
}
date {
timezone => "Australia/Sydney"
match => [ "timestamp", "MMM dd, YYYY KK:mm:ss a" ]
target => "@timestamp"
remove_field => [ "timestamp" ]
}
}
if ([fields][log_type] == "tomcat-access-log") {
grok {
match => [ "message" , "%{IPV4:clientIP} - %{NOTSPACE:user} \[%{DATA:timestamp}\] \"%{WORD:method} %{NOTSPACE:request} HTTP/1.1\" %{NUMBER:status} %{NUMBER:bytesSent}" ]
remove_field => [ "message" ]
}
grok{
match => [ "request", "/%{USERNAME:app}/" ]
tag_on_failure => [ ]
}
date {
match => [ "timestamp", "dd/MMM/YYYY:HH:mm:ss Z" ]
remove_field => [ "timestamp" ]
}
mutate {
lowercase => [ "user" ]
convert => [ "bytesSent", "integer", "duration", "float" ]
update => { "host" => "%{[beat][hostname]}" }
remove_field => [ "beat","type","geoip","input_type","tags" ]
}
if [user] == "-" {
mutate {
remove_field => [ "user" ]
}
}
# drop unmatching message (like IPv6 requests)
if [message] =~ /(.+)/ {
drop { }
}
}
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
manage_template => false
index => "%{[fields][app]}-%{[fields][log_type]}-%{+YYYY.MM.dd}"
}
stdout { codec => rubydebug }
}