Filebeat Kafka Input not able to process Messages

Hi everyone,

I'm currently facing an issue with the Filebeat Kafka input. The event flow is Data Source > Logstash > Kafka > Filebeat > Logstash > Elasticsearch. Kafka is used to handle event ingestion spikes and ES cluster failures as described here. I'd like to use Filebeat to parse the incoming logs with the built-in modules.

The error I'm getting is related to JSON serialization, though I'm not sure where the issue comes from - the messages seem fine at every step except the filebeat input. Here's the relevant error lines:

2021-12-31T10:26:47.767Z        ERROR   [file]  fileout/file.go:128     Failed to serialize the event: unsupported
2021-12-31T10:26:47.767Z        DEBUG   [file]  fileout/file.go:132     Failed event: &{{2021-12-31 10:26:46.708 +0000 UTC Not valid json: json: unsupported type: func() {"agent":{"ephemeral_id":"b1070c2d-fbe0-45e5-858e-6bfebaf62bbb","hostname":"ca3b5315a976","id":"9af1c238-66aa-415b-95f1-b51243ba8c60","name":"ca3b5315a976","type":"filebeat","version":"7.16.2"},"ecs":{"version":"1.12.0"},"host":"xxxxx","input":{"type":"kafka"},"kafka":{"headers":[],"key":"","offset":4,"partition":2,"topic":"cyberark"},"message":"2021-12-31T10:26:46.650Z server.domain.net why is this failing?"} <nil> false} 1 {map[]}}

This is the Logstash pipeline I'm using to produce to the Kafka topic:

input {
  tcp {
    port => 5146
    id => "ls-tag-1-cyberark-input"
  }
}

filter {
    mutate {
        id => "ls-tag-1-cyberark-add-customer-label"
        add_field => { "[labels][customer]" => "Customer Name" }
    }
}

output {
  kafka {
    id => "ls-tag-1-kafka-output-cyberark"
    client_id => "ls-tag-1"
    bootstrap_servers => "server1:9092,server2:9092,server3:9092"
    topic_id => "cyberark"
  }
}

I sent some lines to the Logstash host through like this nc:

[root@server4~]# nc server5 5146
this should definitely not be json
why is this failing?

They're correctly received by Logstash and pushed to Kafka:

/opt/kafka/bin/kafka-console-consumer.sh --topic cyberark --from-beginning --bootstrap-server localhost:9092

2021-12-31T10:07:29.837Z server6 this should definitely not be json
2021-12-31T10:26:46.650Z server6 why is this failing?

This is the configuration of the Kafka topic:

/opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topics-with-overrides
Topic: __consumer_offsets       TopicId: qk6IHkcDRn6V_cR3vV57mg PartitionCount: 50      ReplicationFactor: 3    Configs: compression.type=producer,cleanup.policy=compact,segment.bytes=104857600
Topic: cyberark      TopicId: p2BhSdPMT2afRLz1T4-_Dw PartitionCount: 3       ReplicationFactor: 3    Configs: segment.bytes=1073741824

And finally this is the filebeat.yml with the Kafka input that produces the error:

filebeat.inputs:
  - type: kafka
    hosts:
      - server1:9092
      - server2:9092
      - server3:9092
    topics: ["cyberark"]
    group_id: "filebeat"
#    parsers:
 #     - ndjson:
  #      keys_under_root: true
   #     add_error_key: true
    #    message_key: log


output.file:
  path: "/usr/share/filebeat/kfk.out"

logging.level: debug

I tried the same with the Logstash Kafka output set to json codec, and the ndjson parser enabled as commented in the filebeat.yml - still no luck:

&{{2021-12-29 17:16:25.084 +0000 UTC Not valid json: json: unsupported type: func() {"@version":"1","agent":{"ephemeral_id":"3e041218-fee8-46b5-93cb-0ac6fd22c1b9","hostname":"ca3b5315a976","id":"9af1c238-66aa-415b-95f1-b51243ba8c60","name":"ca3b5315a976","type":"filebeat","version":"7.16.2"},"ecs":{"version":"1.12.0"},"host":"xxxxx","input":{"type":"kafka"},"kafka":{"headers":[],"key":"","offset":0,"partition":1,"topic":"cyberark"},"labels":{"customer":"Customer"},"message":"{\"port\":41658,\"host\":\"server1\",\"@timestamp\":\"2021-12-29T17:16:25.031Z\",\"@version\":\"1\",\"labels\":{\"customer\":\"Customer\"},\"message\":\"this is nice\"}","port":41658} <nil> false} 1 {map[]}}

I think somehow that last part from the error message is added at some point between Kafka and Filebeat - <nil> false} 1 {map[]} and breaks the json parsing, though I don't understand where - the configuration seems pretty straightforward.

Happy about any pointers :slight_smile:

The same Kafka topic can be read from a Logstash Kafka input without issue - I assume something in the Filebeat configuration is not working out as it should. Still happy for any pointers, the workaround with Logstash is rather clunky, as it would require routing from Logstash to Filebeat to make use of the CEF processor, then back to Logstash to make use of the multiple outputs option as the documents should end up in two separate clusters.

Hi,
I had the same issue with filebeat 7.16.2, with version 7.15.0 it works like a charm.

1 Like

Thanks for the hint Franck! I redeployed with Filebeat 7.15.2 and it worked perfectly fine. Will open an issue on the Filebeat Github for this.

Github issue link: [Filebeat] Kafka Input not working in 7.16.2 · Issue #29746 · elastic/beats · GitHub

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.