Filebeat / Kafka bug?

Filebeat Version: 7.x (testing on 7.1.1 and 7.1.2)
Kafka Version: Azure Event Hubs Kafka surface

Logstash and Fluentd both work with Event Hubs Kafka interface, Filebeat not so much.

For some reason it appears the Event Hub is not happy with how filebeat is authenticating, at a guess. As seen in the log snippet below, it appears the EH is closing the connection abruptly. This happens many times per minutes, and seems to block Filebeat for some time.

Every approx 1-3 minutes Filebeat successfully publishes 1 or 2 thousand events to the topic, and then enters an endless loop of the below auth/network drop cycle for another 1-3 minutes or more.

Since other producers do not experience this (logstash nor fluentd) it seems a bug in filebeat, or the Kafka library it uses, that's triggering Event Hubs to RST the connection.

Anyone have any ideas?

Filebeat Config

  output:
    kafka:
      hosts:
        - "azure_eh_kafka.servicebus.windows.net:9093"
      topic: test
      version: "1.0.0"
      ssl.enabled: true
      username: "$ConnectionString"
      password: "Endpoint=sb://azure_eh_kafka.servicebus.windows.net/;SharedAccessKeyName=<KEYNAME>;SharedAccessKey=<KEY>"
      compression: none

Filebeat logs

{"level":"info","timestamp":"2019-06-14T10:22:02.343Z","caller":"kafka/log.go:53","message":"kafka message: Successful SASL handshake"}
{"level":"info","timestamp":"2019-06-14T10:22:02.344Z","caller":"kafka/log.go:53","message":"SASL authentication successful with broker azure_eh_kafka.servicebus.windows.net:9093:4 - [0 0 0 0]\n"}
{"level":"info","timestamp":"2019-06-14T10:22:02.344Z","caller":"kafka/log.go:53","message":"Connected to broker at azure_eh_kafka.servicebus.windows.net:9093 (registered as #0)\n"}
{"level":"info","timestamp":"2019-06-14T10:22:02.375Z","caller":"kafka/log.go:53","message":"producer/broker/0 state change to [closing] because write tcp 10.32.nnn.nnn:41188->13.69.nnn.nnn:9093: write: connection reset by peer\n"}
{"level":"info","timestamp":"2019-06-14T10:22:02.375Z","caller":"kafka/log.go:53","message":"Error while closing connection to broker azure_eh_kafka.servicebus.windows.net:9093: write tcp 10.32.nnn.nnn:41188->13.69.nnn.nnn:9093: write: broken pipe\n"}

{"level":"info","timestamp":"2019-06-14T10:22:02.496Z","caller":"kafka/log.go:53","message":"SASL authentication successful with broker azure_eh_kafka.servicebus.windows.net:9093:4 - [0 0 0 0]\n"}
{"level":"info","timestamp":"2019-06-14T10:22:02.496Z","caller":"kafka/log.go:53","message":"Connected to broker at azure_eh_kafka.servicebus.windows.net:9093 (registered as #0)\n"}
{"level":"info","timestamp":"2019-06-14T10:22:02.763Z","caller":"kafka/log.go:53","message":"producer/broker/0 state change to [closing] because write tcp 10.32.nnn.nnn:41190->13.69.nnn.nnn:9093: write: connection reset by peer\n"}
{"level":"info","timestamp":"2019-06-14T10:22:02.763Z","caller":"kafka/log.go:53","message":"Error while closing connection to broker azure_eh_kafka.servicebus.windows.net:9093: write tcp 10.32.nnn.nnn:41190->13.69.nnn.nnn:9093: write: broken pipe\n"}
{"level":"debug","timestamp":"2019-06-14T10:22:02.767Z","logger":"kafka","caller":"kafka/client.go:251","message":"finished kafka batch"}
{"level":"debug","timestamp":"2019-06-14T10:22:02.767Z","logger":"kafka","caller":"kafka/client.go:265","message":"Kafka publish failed with: write tcp 10.32.nnn.nnn:41190->13.69.nnn.nnn:9093: write: connection reset by peer"}
{"level":"debug","timestamp":"2019-06-14T10:22:02.767Z","logger":"kafka","caller":"kafka/client.go:251","message":"finished kafka batch"}
{"level":"debug","timestamp":"2019-06-14T10:22:02.767Z","logger":"kafka","caller":"kafka/client.go:265","message":"Kafka publish failed with: write tcp 10.32.nnn.nnn:41190->13.69.nnn.nnn:9093: write: connection reset by peer"}

This appears to be caused by:

https://github.com/elastic/beats/pull/12254/files#diff-ea065a113b39ad622b84e2dcedd8ffeeR215

Compile from master and set bulk_max_size to something reasonable and it appears to be working perfectly now.

Had tried setting bulk_max_size in 7.1.x versions, but that didn't help.

Looking at the code overriding the bulk_max_size value in the configuration it should have the same behavior as having the value hardcoded. I am surprised you don't get the same behavior because all settings are read and unpacked at the same time?

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.