FB > Logstash > Kafka

We have an issue where Kafka does not receive full json message, we can see the json messages and tags in output to file in logstash but when we view the message in kafka it is missing the filebeat header appended.

We had a similar issue when using lumberjack to lumberjack and we were able to resolve this issue with the codec placed in the correct point in the pipeline

Messages in logstash

{"@timestamp":"2019-02-22T16:10:29.383Z","@version":"1","offset":1571,"tags":["dev","localhost_access","tomcat","app","beats_input_codec_plain_applied"],"beat":{"hostname":"youre_it","version":"6.3.2","name":"youre_it"},"source":"/opt/lock/logs/localhost_access_log.txt","message":"10.10.10.133 - - [22/Feb/2019:16:10:18 +0000] "GET /lock/rest/healthCheck HTTP/1.1" 200 28","prospector":{"type":"log"},"input":{"type":"log"},"host":{"name":"youre_it"}}

When we view the message in kafka it is missing this which is tagging info we need to use in our parsers and segearting dassh boards between environments.

{"@timestamp":"2019-02-22T16:10:29.383Z","@version":"1","offset":1571,"tags":["dev","localhost_access","tomcat","app","beats_input_codec_plain_applied"],"beat":

The messages seem to add a new header to the messages in kafka with timestamp and hostname

2019-02-22T22:21:04.600Z {name=youre_it} 2019-02-22T22:21:01+0000

We tried codecs in several different working configurations and nothing seems to change this result.

Below is our pipeline config

input {
beats {
port => 5044
ssl  => false
client_inactivity_timeout => 300
}
}

output {
kafka {
acks => "all"
bootstrap_servers => "10.10.10.100:9092"
codec => plain
topic_id => "microservice"
}
}

and before anyone ask yes we are using filebeat to logstash and then logstash to kafka the reason for the logstash before kafka is we are using it as bridge to cross a vpn to kafka in another datacenter where we then will pick it up with logstash and parse and ingest into elasticsearch

Thank you in advance

Your output to Kafka is using codec => plain; if you want it to send the fully-structured event as JSON, then you need to use codec => json

"We have tried codecs in several different working configurations and nothing seems to change this result."
This means we have tried json, json_lines and plain with no luck, in the input and output sections of the config.

So you would think that json or json lines would have worked but it has not,
We do get json formatted messages when we use those but unfortunately we do not get the header part of the json message.
It is not the full message received in Logstash, it is missing the header that is added by File beats which contains the tags we add in the filebeat configuration.

To me it also looks like the kafka output plugin drop the FB header and adds a time stamp and hostname to the message but again the FB header is no longer there when it reaches Kafka

Hope this helps let me know if I can provide anything more.

I am not familiar with anything called a "header", so it is not clear to me what is not working as expected.

  • when using codec => plain, the Plain Codec stringifies the event in a simplistic way, or uses the provided { format => "" } to sprintf certain fields from the event into a single string.
  • when using codec => json, the entire structured event is output as a line in the output.

There is a reserved @metadata field within a Logstash event, which contains a variety of source metadata, but this reserved field is for pipeline-internal use and is not included in output codecs by design. If it is in fact the metadata that you are missing (your description doesn't clearly indicate this), then it can be copied to a non-reserved field prior to output, which would cause the JSON codec to include it:

filter {
  mutate {
    copy => { "[@metadata]" => "[metadata]" }
  }
}

However, if you are merely using Logstash as a forwarder (e.g., not using filters to extract information from and enrich the events), then you may be interested in configuring Filebeat to send its events directly to Kafka.

Ok So maybe it is not called a header but it is appended data to the actual message by Filebeat

so what Im referring to is the Bold section as this data never makes it to kafka

{"@timestamp":"2019-02-22T16:10:29.383Z","@version":"1","offset":1571,"tags":["dev","localhost_access","tomcat","app","beats_input_codec_plain_applied"],"beat":{"hostname":"youre_it","version":"6.3.2","name":"youre_it"},"source":"/opt/lock/logs/localhost_access_log.txt","message":"10.10.10.133 - - [22/Feb/2019:16:10:18 +0000] "GET /lock/rest/healthCheck HTTP/1.1" 200 28","prospector":{"type":"log"},"input":{"type":"log"},"host":{"name":"youre_it"}}

This topic can be close as this seems to have resolved itself, I don't know what happened but I have reset the codec to json_lines and now the data appears in the messages, my only assumption is that the pipeline may have not been reloading properly.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.