My filebeat cannot connect to kafka with SASL_PLAIN authentication enabled,and error massage is:
2023-02-10T11:59:56.014+0800 ERROR [kafka] kafka/client.go:317 Kafka (topic=test-logs): kafka: client has run out of available brokers to talk to (Is your cluster reachable?)
after logs:
2023-02-13T22:13:17.639+0800 INFO [publisher_pipeline_output] pipeline/output.go:143 Connecting to kafka(172.16.129.82:9092)
2023-02-13T22:13:17.639+0800 INFO [publisher_pipeline_output] pipeline/output.go:151 Connection to kafka(172.16.129.82:9092) established
Filebeat Version: 7.15.1
kafka Version: 2.8.1
filebeat output config:
output.kafka:
enable: true
hosts: '172.xx.xx.xx:9092'
version: '1.0.0'
topic: '%{[fields][logTopic]}'
partition.round_robin:
reachable_only: false
compression: gzip
max_message_bytes: 102400
ssl.enable: false
sasl.mechanism: PLAIN
username: admin
password: xxxxxxxx
I I changed the source code libbeat/outputs/kafka/kafka.go and recompiled filebeat to print the sarama log。
sarama.Logger = kafkaLogger{log: logp.NewLogger(logSelector)}
replace by:
sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
and I see some sarama log:
[sarama] 2023/02/13 22:15:01 Initializing new client
[sarama] 2023/02/13 22:15:01 Successfully initialized new client
[sarama] 2023/02/13 22:15:01 client/metadata fetching metadata for [vcloud-running-logs] from broker 172.xx.xx.xx:9092
[sarama] 2023/02/13 22:15:01 Failed to send SASL handshake 172.xx.xx.xx:9092: read tcp 172.xxx.xxx.xxx:50022->172.xx.xx.xx:9092: read: connection reset by peer
[sarama] 2023/02/13 22:15:01 Error while performing SASL handshake 172.xx.xx.xx:9092
[sarama] 2023/02/13 22:15:01 Closed connection to broker 172.xx.xx.xx:9092
[sarama] 2023/02/13 22:15:01 client/metadata got error from broker -1 while fetching metadata: read tcp 172.16.128.33:50022->172.xx.xx.xx:9092: read: connection reset by peer
[sarama] 2023/02/13 22:15:01 client/metadata no available broker to send metadata request to
[sarama] 2023/02/13 22:15:01 client/brokers resurrecting 1 dead seed brokers
[sarama] 2023/02/13 22:15:01 client/metadata retrying after 250ms... (3 attempts remaining)
kafka server log print:
[2023-02-13 22:16:05,854] DEBUG connections.max.reauth.ms for mechanism=PLAIN: 0 (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator)
[2023-02-13 22:16:05,854] WARN [SocketServer listenerType=ZK_BROKER, nodeId=0] Unexpected error from /172.1.2.4; closing connection (org.apache.kafka.common.network.Selector)
org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = 369295616 larger than 524288)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:105)
at org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.authenticate(SaslServerAuthenticator.java:257)
at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:181)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:543)
at org.apache.kafka.common.network.Selector.poll(Selector.java:481)
at kafka.network.Processor.poll(SocketServer.scala:989)
at kafka.network.Processor.run(SocketServer.scala:892)
at java.lang.Thread.run(Thread.java:750)
but when i use a demo with also using sarama to connecting kafka, it is work fine.