Filebeat / Kafka bug?

Filebeat Version: 7.x (testing on 7.1.1 and 7.1.2)
Kafka Version: Azure Event Hubs Kafka surface

Logstash and Fluentd both work with Event Hubs Kafka interface, Filebeat not so much.

For some reason it appears the Event Hub is not happy with how filebeat is authenticating, at a guess. As seen in the log snippet below, it appears the EH is closing the connection abruptly. This happens many times per minutes, and seems to block Filebeat for some time.

Every approx 1-3 minutes Filebeat successfully publishes 1 or 2 thousand events to the topic, and then enters an endless loop of the below auth/network drop cycle for another 1-3 minutes or more.

Since other producers do not experience this (logstash nor fluentd) it seems a bug in filebeat, or the Kafka library it uses, that's triggering Event Hubs to RST the connection.

Anyone have any ideas?

Filebeat Config

  output:
    kafka:
      hosts:
        - "azure_eh_kafka.servicebus.windows.net:9093"
      topic: test
      version: "1.0.0"
      ssl.enabled: true
      username: "$ConnectionString"
      password: "Endpoint=sb://azure_eh_kafka.servicebus.windows.net/;SharedAccessKeyName=<KEYNAME>;SharedAccessKey=<KEY>"
      compression: none

Filebeat logs

{"level":"info","timestamp":"2019-06-14T10:22:02.343Z","caller":"kafka/log.go:53","message":"kafka message: Successful SASL handshake"}
{"level":"info","timestamp":"2019-06-14T10:22:02.344Z","caller":"kafka/log.go:53","message":"SASL authentication successful with broker azure_eh_kafka.servicebus.windows.net:9093:4 - [0 0 0 0]\n"}
{"level":"info","timestamp":"2019-06-14T10:22:02.344Z","caller":"kafka/log.go:53","message":"Connected to broker at azure_eh_kafka.servicebus.windows.net:9093 (registered as #0)\n"}
{"level":"info","timestamp":"2019-06-14T10:22:02.375Z","caller":"kafka/log.go:53","message":"producer/broker/0 state change to [closing] because write tcp 10.32.nnn.nnn:41188->13.69.nnn.nnn:9093: write: connection reset by peer\n"}
{"level":"info","timestamp":"2019-06-14T10:22:02.375Z","caller":"kafka/log.go:53","message":"Error while closing connection to broker azure_eh_kafka.servicebus.windows.net:9093: write tcp 10.32.nnn.nnn:41188->13.69.nnn.nnn:9093: write: broken pipe\n"}

{"level":"info","timestamp":"2019-06-14T10:22:02.496Z","caller":"kafka/log.go:53","message":"SASL authentication successful with broker azure_eh_kafka.servicebus.windows.net:9093:4 - [0 0 0 0]\n"}
{"level":"info","timestamp":"2019-06-14T10:22:02.496Z","caller":"kafka/log.go:53","message":"Connected to broker at azure_eh_kafka.servicebus.windows.net:9093 (registered as #0)\n"}
{"level":"info","timestamp":"2019-06-14T10:22:02.763Z","caller":"kafka/log.go:53","message":"producer/broker/0 state change to [closing] because write tcp 10.32.nnn.nnn:41190->13.69.nnn.nnn:9093: write: connection reset by peer\n"}
{"level":"info","timestamp":"2019-06-14T10:22:02.763Z","caller":"kafka/log.go:53","message":"Error while closing connection to broker azure_eh_kafka.servicebus.windows.net:9093: write tcp 10.32.nnn.nnn:41190->13.69.nnn.nnn:9093: write: broken pipe\n"}
{"level":"debug","timestamp":"2019-06-14T10:22:02.767Z","logger":"kafka","caller":"kafka/client.go:251","message":"finished kafka batch"}
{"level":"debug","timestamp":"2019-06-14T10:22:02.767Z","logger":"kafka","caller":"kafka/client.go:265","message":"Kafka publish failed with: write tcp 10.32.nnn.nnn:41190->13.69.nnn.nnn:9093: write: connection reset by peer"}
{"level":"debug","timestamp":"2019-06-14T10:22:02.767Z","logger":"kafka","caller":"kafka/client.go:251","message":"finished kafka batch"}
{"level":"debug","timestamp":"2019-06-14T10:22:02.767Z","logger":"kafka","caller":"kafka/client.go:265","message":"Kafka publish failed with: write tcp 10.32.nnn.nnn:41190->13.69.nnn.nnn:9093: write: connection reset by peer"}