Unable to sends data to AWS MSK, Error Message: Kafka publish failed with: circuit breaker is open

Hello,

I have configured a pipeline, that will read the application logs and sends the log messages to individual kafka topic.
I have installed filebeat -7.10.0 and configured AWS msk -2.2.1. After starting the filebeat service, i am getting debug message that is " Kafka publish failed with: circuit breaker is open". In filebeat kafka output section, i have used attributes such as hosts,topic name and worker :3

I have checked my config and output using the below command
@sudo filebeat test config and @sudo filebeat test output is showing status OK.

Could you please help me, where i have to change the config so that, kafka circitbreaker issue would be resolved and data could be send to AWS Msk cluster.

Debug Log :

2021-02-09T16:51:37.093Z INFO [publisher] pipeline/retry.go:219 retryer: send unwait signal to consumer
2021-02-09T16:51:37.093Z INFO [publisher] pipeline/retry.go:223 done
2021-02-09T16:51:37.096Z DEBUG [kafka] kafka/client.go:277 finished kafka batch
2021-02-09T16:51:37.096Z DEBUG [kafka] kafka/client.go:291 Kafka publish failed with: circuit breaker is open
2021-02-09T16:51:37.096Z INFO [publisher] pipeline/retry.go:219 retryer: send unwait signal to consumer
2021-02-09T16:51:37.096Z INFO [publisher] pipeline/retry.go:223 done
2021-02-09T16:51:37.102Z INFO [monitoring] log/log.go:153 Total non-zero metrics {"monitoring": {"metrics": {"beat":{"cpu":{"system":{"ticks":197820,"time":{"ms":197824}},"total":{"ticks":4944570,"time":{"ms":4944580},"value":4944570},"user":{"ticks":4746750,"time":{"ms":4746756}}},"handles":{"limit":{"hard":4096,"soft":1024},"open":10},"info":{"ephemeral_id":"c0be9d23-f6c7-44cf-acb4-2a91c811a892","uptime":{"ms":5114037}},"memstats":{"gc_next":22670736,"memory_alloc":19667176,"memory_total":278550823912,"rss":62857216},"runtime":{"goroutines":19}},"filebeat":{"events":{"active":101,"added":102,"done":1},"harvester":{"closed":1,"open_files":0,"running":0,"started":1}},"libbeat":{"config":{"module":{"running":0},"reloads":1,"scans":1},"output":{"events":{"active":100,"batches":1176131,"failed":117613000,"total":117613100},"type":"kafka"},"outputs":{"kafka":{"bytes_read":118692,"bytes_write":559548}},"pipeline":{"clients":0,"events":{"active":100,"filtered":2,"published":100,"retry":117613100,"total":102}}},"registrar":{"states":{"current":1,"update":1},"writes":{"success":1,"total":1}},"system":{"cpu":{"cores":2},"load":{"1":5.64,"15":5.88,"5":5.86,"norm":{"1":2.82,"15":2.94,"5":2.93}}}}}}
2021-02-09T16:51:37.102Z INFO [monitoring] log/log.go:154 Uptime: 1h25m14.040960117s
2021-02-09T16:51:37.102Z INFO [monitoring] log/log.go:131 Stopping metrics logging.
2021-02-09T16:51:37.102Z INFO instance/beat.go:461 filebeat stopped.

Hi All,
I am unable to post data from filebeat agent to AWS MSK . Initially I was used filebeat version 7.10.0.
As per the elastic team advice I have upgraded into 7.11.0.
I used TLS configuration and please find it below.
Before testing through filebeat agent. I was created a topic first and able to produce and consume data using AWS CLI.
I have used the same TLS config in filebeat and used topic name and broker details. Its showing error like “ client has run out of available brokers to talk to”.

To confirm the brokers are available or not. I used telnet command and able to connect from client machine.
I am having doubt on JKS file, either do we use it or we have to provide pem file for TLS encryption?
Topic :-
./kafka-console-producer.sh --broker-list b-1.sk-kafka-dev.vp11l7.c5.kafka.eu-west-1.amazonaws.com:9094,b-2.sk-kafka-dev.vp11l7.c5.kafka.eu-west-1.amazonaws.com:9094,b-3.sk-kafka-dev.vp11l7.c5.kafka.eu-west-1.amazonaws.com:9094 --producer.config client.properties --topic sktopic

Producer:-
[root@ip-172-31-4-58 bin]# ./kafka-console-producer.sh --broker-list b-1.sk-kafka-dev.vp11l7.c5.kafka.eu-west-1.amazonaws.com:9094,b-2.sk-kafka-dev.vp11l7.c5.kafka.eu-west-1.amazonaws.com:9094,b-3.sk-kafka-dev.vp11l7.c5.kafka.eu-west-1.amazonaws.com:9094 --producer.config client.properties --topic sktopic

hi
hello
how r u ?

Consumer :-

[ec2-user@ip-172-31-4-58 software] cd kafka_2.12-2.2.1/ [ec2-user@ip-172-31-4-58 kafka_2.12-2.2.1] cd bin
[ec2-user@ip-172-31-4-58 bin]$ ./kafka-console-consumer.sh --bootstrap-server b-1.sk-kafka-dev.vp11l7.c5.kafka.eu-west-1.amazonaws.com:9094,b-2.sk-kafka-dev.vp11l7.c5.kafka.eu-west-1.amazonaws.com:9094,b-3.sk-kafka-dev.vp11l7.c5.kafka.eu-west-1.amazonaws.com:9094 --consumer.config client.properties --topic sktopic --from-beginning
hi
hello
how r u ?

filebeat.yml :-
#-------------------------------- Kafka Output --------------------------------
output.kafka:

enabled: true

hosts: [ "b-1.sk-kafka-dev.vp11l7.c5.kafka.eu-west-1.amazonaws.com:9094",
"b-2.sk-kafka-dev.vp11l7.c5.kafka.eu-west-1.amazonaws.com:9094",
"b-3.sk-kafka-dev.vp11l7.c5.kafka.eu-west-1.amazonaws.com:9094" ]

topic: "sktopic"

Use TLS settings for encryption

tls.certificate_authorities: ["/tmp/kafka.client.truststore.jks"]

#ssl.enabled: false

Error Messages :-

2021-02-15T16:31:50.801Z DEBUG [input] input/input.go:139 Run input
2021-02-15T16:31:50.802Z DEBUG [input] log/input.go:205 Start next scan
2021-02-15T16:31:50.802Z DEBUG [input] log/input.go:439 Check file for harvesting: /var/tmp/dummy_log_4.log
2021-02-15T16:31:50.802Z DEBUG [input] log/input.go:530 Update existing file for harvesting: /var/tmp/dummy_log_4.log, offset: 3490
2021-02-15T16:31:50.802Z DEBUG [input] log/input.go:583 Harvester for file is still running: /var/tmp/dummy_log_4.log
2021-02-15T16:31:50.802Z DEBUG [input] log/input.go:226 input states cleaned up. Before: 1, After: 1, Pending: 0
2021-02-15T16:31:55.807Z DEBUG [harvester] log/log.go:107 End of file reached: /var/tmp/dummy_log_4.log; Backoff now.
2021-02-15T16:31:56.873Z ERROR [kafka] kafka/client.go:317 Kafka (topic=sktopic): kafka: client has run out of available brokers to talk to (Is your cluster reachable?)
^C
[root@ip-172-31-4-58 filebeat]# telnet b-1.sk-kafka-dev.vp11l7.c5.kafka.eu-west-1.amazonaws.com 9094
Trying 172.31.17.54...
Connected to b-1.sk-kafka-dev.vp11l7.c5.kafka.eu-west-1.amazonaws.com.
Escape character is '^]'.
^CConnection closed by foreign host.
[root@ip-172-31-4-58 filebeat]# telnet b-2.sk-kafka-dev.vp11l7.c5.kafka.eu-west-1.amazonaws.com 9094
Trying 172.31.44.120...
Connected to b-2.sk-kafka-dev.vp11l7.c5.kafka.eu-west-1.amazonaws.com.
Escape character is '^]'.
^CConnection closed by foreign host.
[root@ip-172-31-4-58 filebeat]# telnet b-3.sk-kafka-dev.vp11l7.c5.kafka.eu-west-1.amazonaws.com 9094
Trying 172.31.5.246...
Connected to b-3.sk-kafka-dev.vp11l7.c5.kafka.eu-west-1.amazonaws.com.
Escape character is '^]'.

Here all the brokers are reachable, still its throwing error. Please suggest any one facing same error ?

Please, can you format your configs, shell outputs, commands, etc in markdown? it's difficult to follow :sweat:

That kafka error is from the kafka golang library from other posts found using the forum search.

And please, don't clone posts Filebeat 7.11 is not publishing application log into AWS MSK-2.2.1, getting error kafka: client has run out of available brokers to talk

Hi Mario,

This issue has been resolved.

We resolved the SSL handshake issue in MSK end by adding the following entries in filebeat config file.
ssl.enabled: true
tls.certificate_authorities - "usr/share/softwares/cert/kafka.client.truststore.jks"

Alternatively, we can convert the .jks into .pem and provide the below params in filebeat config file:

ssl.certificate_authorities: ["/usr/share/softwares/cert/certfile.pem"]

By doing anyone of the above we are able to successfully write and read TLS encrypted data from AWS MSK.