Kafka clients producer and consumer does good job in making sure that any errors are not propagated up to logstash. However, some errors can slip through and especially during a cluster restart.
Our problem is that with SASL_SSL enabled, the consumer recieves an authentication exception when it reconnects. This error occurs in brokers as well but they retry until it works.
I would suggest adding a begin/rescue around the poll() method and catch ApiExceptions there. I do not see any problem to simply continue even if an error persists instead of terminating the pipeline.
This approach solves our problem since "under-the-hoods", KafkaConsumer will resume connection to brokers successfully.
#Begin before poll() begin records = consumer.poll(poll_timeout_ms) # throws Exception for record in records do codec_instance.decode(record.value.to_s) do |event| decorate(event) if @decorate_events event.set("[@metadata][kafka][topic]", record.topic) event.set("[@metadata][kafka][consumer_group]", @group_id) event.set("[@metadata][kafka][partition]", record.partition) event.set("[@metadata][kafka][offset]", record.offset) event.set("[@metadata][kafka][key]", record.key) event.set("[@metadata][kafka][timestamp]", record.timestamp) end logstash_queue << event end end # Manual offset commit if @enable_auto_commit == "false" consumer.commitSync end # Rescue on ApiException rescue org.apache.kafka.common.errors.ApiException => ae logger.warn("Exception during poll occured") # Perhaps sleep a few seconds before continue? end