Filebeat cannot connect to kafka with SASL_PLAIN authentication enabled?

My filebeat cannot connect to kafka with SASL_PLAIN authentication enabled,and error massage is:

2023-02-10T11:59:56.014+0800	ERROR	[kafka]	kafka/client.go:317	Kafka (topic=test-logs): kafka: client has run out of available brokers to talk to (Is your cluster reachable?)

after logs:

2023-02-13T22:13:17.639+0800    INFO    [publisher_pipeline_output]     pipeline/output.go:143  Connecting to kafka(172.16.129.82:9092)
2023-02-13T22:13:17.639+0800    INFO    [publisher_pipeline_output]     pipeline/output.go:151  Connection to kafka(172.16.129.82:9092) established

Filebeat Version: 7.15.1
kafka Version: 2.8.1
filebeat output config:

output.kafka:
  enable: true
  hosts: '172.xx.xx.xx:9092'
  version: '1.0.0'
  topic: '%{[fields][logTopic]}'
  partition.round_robin:
    reachable_only: false
  compression: gzip
  max_message_bytes: 102400
  ssl.enable: false
  sasl.mechanism: PLAIN
  username: admin
  password: xxxxxxxx

I I changed the source code libbeat/outputs/kafka/kafka.go and recompiled filebeat to print the sarama log。

sarama.Logger = kafkaLogger{log: logp.NewLogger(logSelector)}

replace by:

sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)

and I see some sarama log:

[sarama] 2023/02/13 22:15:01 Initializing new client
[sarama] 2023/02/13 22:15:01 Successfully initialized new client
[sarama] 2023/02/13 22:15:01 client/metadata fetching metadata for [vcloud-running-logs] from broker 172.xx.xx.xx:9092
[sarama] 2023/02/13 22:15:01 Failed to send SASL handshake 172.xx.xx.xx:9092: read tcp 172.xxx.xxx.xxx:50022->172.xx.xx.xx:9092: read: connection reset by peer
[sarama] 2023/02/13 22:15:01 Error while performing SASL handshake 172.xx.xx.xx:9092
[sarama] 2023/02/13 22:15:01 Closed connection to broker 172.xx.xx.xx:9092
[sarama] 2023/02/13 22:15:01 client/metadata got error from broker -1 while fetching metadata: read tcp 172.16.128.33:50022->172.xx.xx.xx:9092: read: connection reset by peer
[sarama] 2023/02/13 22:15:01 client/metadata no available broker to send metadata request to
[sarama] 2023/02/13 22:15:01 client/brokers resurrecting 1 dead seed brokers
[sarama] 2023/02/13 22:15:01 client/metadata retrying after 250ms... (3 attempts remaining)

kafka server log print:

[2023-02-13 22:16:05,854] DEBUG connections.max.reauth.ms for mechanism=PLAIN: 0 (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator)
[2023-02-13 22:16:05,854] WARN [SocketServer listenerType=ZK_BROKER, nodeId=0] Unexpected error from /172.1.2.4; closing connection (org.apache.kafka.common.network.Selector)
org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = 369295616 larger than 524288)
        at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:105)
        at org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.authenticate(SaslServerAuthenticator.java:257)
        at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:181)
        at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:543)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:481)
        at kafka.network.Processor.poll(SocketServer.scala:989)
        at kafka.network.Processor.run(SocketServer.scala:892)
        at java.lang.Thread.run(Thread.java:750)

but when i use a demo with also using sarama to connecting kafka, it is work fine.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.