I am trying to work with beats, logstash, elasticsearch and kafka and it appears that logstash file output could be dropping messages.
I started with a simple filebeat to logstash to work on my filtering. The logstash was just outputing to a file and I noticed that the file created by filebeat would have the expected number of messages while the file created by logstash could have fewer messages. I would test by concatenating a know test file to the file being watched by filebeat. I was working with a filter and thought it might be something I was doing but I was not seeing anything in the logstash error file and grok parsing errors would still output the message.
I figured I would move on and build out my test environment and eventually fix the dropped message problem. The new test went from filebeat to logstash to kafka to logstash. As before filebeat and both logstashes logged to a file as well as outputing to logstash and kafka. Again messages were being dropped but I found a case that made me think the issue was with the file output. I found instances where a message was in the filebeat file and the logstash file that read from kafka but not in the logstash file that loaded kafka. How can a message be in the kafka output but not the input when I cleared the files and restarted filebeat and both logstashes before each test?
Can file output have a threading or some other issue? I did not change any of the threading parameters in either logstash. Is the issue that I am using cat and filebeat gets all my messages at the same time. My testfile is only 85 lines and filebeat is finding 22 messages. 2 of the messages are multi-line.
Here is the logstash configuration that listens to beats and load kafka.
input {
beats {
port => 5044
}
}
output {
file {
path => "/var/log/logstash/beats-test-%{+e}.log"
codec => rubydebug
}
kafka {
topic_id => "logstash"
bootstrap_servers => "server01:9092,server02:9092"
}
}
Here is the input and output of the logstash configuration that reads from kafka.
input {
kafka {
group_id => "logstash-kafka"
topic_id => "logstash"
zk_connect => "server01:2181,server02:2181"
}
}
filter {
# Filter using grok, ruby, kv, mutate, match, geoip, useragent, prune, ...
}
output {
file {
path => "/var/log/logstash/kafka-test-%{+e}.log"
codec => rubydebug
}
if "_grokparsefailure" in [tags] {
file {
path => "/var/log/logstash/logstash-kafka-grok-failure-%{+YYYY-MM-dd}.log"
codec => rubydebug
}
} else {
# Output to elasticsearch
}
}
Wes.