Hi,
Kafka clients producer and consumer does good job in making sure that any errors are not propagated up to logstash. However, some errors can slip through and especially during a cluster restart.
Our problem is that with SASL_SSL enabled, the consumer recieves an authentication exception when it reconnects. This error occurs in brokers as well but they retry until it works.
I would suggest adding a begin/rescue around the poll() method and catch ApiExceptions there. I do not see any problem to simply continue even if an error persists instead of terminating the pipeline.
This approach solves our problem since "under-the-hoods", KafkaConsumer will resume connection to brokers successfully.
#Begin before poll()
begin
records = consumer.poll(poll_timeout_ms) # throws Exception
for record in records do
codec_instance.decode(record.value.to_s) do |event|
decorate(event)
if @decorate_events
event.set("[@metadata][kafka][topic]", record.topic)
event.set("[@metadata][kafka][consumer_group]", @group_id)
event.set("[@metadata][kafka][partition]", record.partition)
event.set("[@metadata][kafka][offset]", record.offset)
event.set("[@metadata][kafka][key]", record.key)
event.set("[@metadata][kafka][timestamp]", record.timestamp)
end
logstash_queue << event
end
end
# Manual offset commit
if @enable_auto_commit == "false"
consumer.commitSync
end
# Rescue on ApiException
rescue org.apache.kafka.common.errors.ApiException => ae
logger.warn("Exception during poll occured")
# Perhaps sleep a few seconds before continue?
end