Should Kafka Input handle Exceptions during poll()?


Kafka clients producer and consumer does good job in making sure that any errors are not propagated up to logstash. However, some errors can slip through and especially during a cluster restart.

Our problem is that with SASL_SSL enabled, the consumer recieves an authentication exception when it reconnects. This error occurs in brokers as well but they retry until it works.

I would suggest adding a begin/rescue around the poll() method and catch ApiExceptions there. I do not see any problem to simply continue even if an error persists instead of terminating the pipeline.

This approach solves our problem since "under-the-hoods", KafkaConsumer will resume connection to brokers successfully.

#Begin before poll()

  records = consumer.poll(poll_timeout_ms) # throws Exception

  for record in records do
    codec_instance.decode(record.value.to_s) do |event|
      if @decorate_events
        event.set("[@metadata][kafka][topic]", record.topic)
        event.set("[@metadata][kafka][consumer_group]", @group_id)
        event.set("[@metadata][kafka][partition]", record.partition)
        event.set("[@metadata][kafka][offset]", record.offset)
        event.set("[@metadata][kafka][key]", record.key)
        event.set("[@metadata][kafka][timestamp]", record.timestamp)
      logstash_queue << event
  # Manual offset commit
  if @enable_auto_commit == "false"

# Rescue on ApiException
rescue org.apache.kafka.common.errors.ApiException => ae
  logger.warn("Exception during poll occured")
  # Perhaps sleep a few seconds before continue?

