We have an issue where Kafka does not receive full json message, we can see the json messages and tags in output to file in logstash but when we view the message in kafka it is missing the filebeat header appended.
We had a similar issue when using lumberjack to lumberjack and we were able to resolve this issue with the codec placed in the correct point in the pipeline
Messages in logstash
{"@timestamp":"2019-02-22T16:10:29.383Z","@version":"1","offset":1571,"tags":["dev","localhost_access","tomcat","app","beats_input_codec_plain_applied"],"beat":{"hostname":"youre_it","version":"6.3.2","name":"youre_it"},"source":"/opt/lock/logs/localhost_access_log.txt","message":"10.10.10.133 - - [22/Feb/2019:16:10:18 +0000] "GET /lock/rest/healthCheck HTTP/1.1" 200 28","prospector":{"type":"log"},"input":{"type":"log"},"host":{"name":"youre_it"}}
When we view the message in kafka it is missing this which is tagging info we need to use in our parsers and segearting dassh boards between environments.
{"@timestamp":"2019-02-22T16:10:29.383Z","@version":"1","offset":1571,"tags":["dev","localhost_access","tomcat","app","beats_input_codec_plain_applied"],"beat":
The messages seem to add a new header to the messages in kafka with timestamp and hostname
2019-02-22T22:21:04.600Z {name=youre_it} 2019-02-22T22:21:01+0000
We tried codecs in several different working configurations and nothing seems to change this result.
Below is our pipeline config
input {
beats {
port => 5044
ssl => false
client_inactivity_timeout => 300
}
}
output {
kafka {
acks => "all"
bootstrap_servers => "10.10.10.100:9092"
codec => plain
topic_id => "microservice"
}
}
and before anyone ask yes we are using filebeat to logstash and then logstash to kafka the reason for the logstash before kafka is we are using it as bridge to cross a vpn to kafka in another datacenter where we then will pick it up with logstash and parse and ingest into elasticsearch
Thank you in advance