Hi everyone,
I'm currently facing an issue with the Filebeat Kafka input. The event flow is Data Source > Logstash > Kafka > Filebeat > Logstash > Elasticsearch. Kafka is used to handle event ingestion spikes and ES cluster failures as described here. I'd like to use Filebeat to parse the incoming logs with the built-in modules.
The error I'm getting is related to JSON serialization, though I'm not sure where the issue comes from - the messages seem fine at every step except the filebeat input. Here's the relevant error lines:
2021-12-31T10:26:47.767Z ERROR [file] fileout/file.go:128 Failed to serialize the event: unsupported
2021-12-31T10:26:47.767Z DEBUG [file] fileout/file.go:132 Failed event: &{{2021-12-31 10:26:46.708 +0000 UTC Not valid json: json: unsupported type: func() {"agent":{"ephemeral_id":"b1070c2d-fbe0-45e5-858e-6bfebaf62bbb","hostname":"ca3b5315a976","id":"9af1c238-66aa-415b-95f1-b51243ba8c60","name":"ca3b5315a976","type":"filebeat","version":"7.16.2"},"ecs":{"version":"1.12.0"},"host":"xxxxx","input":{"type":"kafka"},"kafka":{"headers":[],"key":"","offset":4,"partition":2,"topic":"cyberark"},"message":"2021-12-31T10:26:46.650Z server.domain.net why is this failing?"} <nil> false} 1 {map[]}}
This is the Logstash pipeline I'm using to produce to the Kafka topic:
input {
tcp {
port => 5146
id => "ls-tag-1-cyberark-input"
}
}
filter {
mutate {
id => "ls-tag-1-cyberark-add-customer-label"
add_field => { "[labels][customer]" => "Customer Name" }
}
}
output {
kafka {
id => "ls-tag-1-kafka-output-cyberark"
client_id => "ls-tag-1"
bootstrap_servers => "server1:9092,server2:9092,server3:9092"
topic_id => "cyberark"
}
}
I sent some lines to the Logstash host through like this nc:
[root@server4~]# nc server5 5146
this should definitely not be json
why is this failing?
They're correctly received by Logstash and pushed to Kafka:
/opt/kafka/bin/kafka-console-consumer.sh --topic cyberark --from-beginning --bootstrap-server localhost:9092
2021-12-31T10:07:29.837Z server6 this should definitely not be json
2021-12-31T10:26:46.650Z server6 why is this failing?
This is the configuration of the Kafka topic:
/opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topics-with-overrides
Topic: __consumer_offsets TopicId: qk6IHkcDRn6V_cR3vV57mg PartitionCount: 50 ReplicationFactor: 3 Configs: compression.type=producer,cleanup.policy=compact,segment.bytes=104857600
Topic: cyberark TopicId: p2BhSdPMT2afRLz1T4-_Dw PartitionCount: 3 ReplicationFactor: 3 Configs: segment.bytes=1073741824
And finally this is the filebeat.yml
with the Kafka input that produces the error:
filebeat.inputs:
- type: kafka
hosts:
- server1:9092
- server2:9092
- server3:9092
topics: ["cyberark"]
group_id: "filebeat"
# parsers:
# - ndjson:
# keys_under_root: true
# add_error_key: true
# message_key: log
output.file:
path: "/usr/share/filebeat/kfk.out"
logging.level: debug
I tried the same with the Logstash Kafka output set to json
codec, and the ndjson parser enabled as commented in the filebeat.yml
- still no luck:
&{{2021-12-29 17:16:25.084 +0000 UTC Not valid json: json: unsupported type: func() {"@version":"1","agent":{"ephemeral_id":"3e041218-fee8-46b5-93cb-0ac6fd22c1b9","hostname":"ca3b5315a976","id":"9af1c238-66aa-415b-95f1-b51243ba8c60","name":"ca3b5315a976","type":"filebeat","version":"7.16.2"},"ecs":{"version":"1.12.0"},"host":"xxxxx","input":{"type":"kafka"},"kafka":{"headers":[],"key":"","offset":0,"partition":1,"topic":"cyberark"},"labels":{"customer":"Customer"},"message":"{\"port\":41658,\"host\":\"server1\",\"@timestamp\":\"2021-12-29T17:16:25.031Z\",\"@version\":\"1\",\"labels\":{\"customer\":\"Customer\"},\"message\":\"this is nice\"}","port":41658} <nil> false} 1 {map[]}}
I think somehow that last part from the error message is added at some point between Kafka and Filebeat - <nil> false} 1 {map[]}
and breaks the json parsing, though I don't understand where - the configuration seems pretty straightforward.
Happy about any pointers