Should Kafka Input handle Exceptions during poll()?

Hi,

Kafka clients producer and consumer does good job in making sure that any errors are not propagated up to logstash. However, some errors can slip through and especially during a cluster restart.

Our problem is that with SASL_SSL enabled, the consumer recieves an authentication exception when it reconnects. This error occurs in brokers as well but they retry until it works.

I would suggest adding a begin/rescue around the poll() method and catch ApiExceptions there. I do not see any problem to simply continue even if an error persists instead of terminating the pipeline.

This approach solves our problem since "under-the-hoods", KafkaConsumer will resume connection to brokers successfully.

#Begin before poll()
begin 

  records = consumer.poll(poll_timeout_ms) # throws Exception

  for record in records do
    codec_instance.decode(record.value.to_s) do |event|
      decorate(event)
      if @decorate_events
        event.set("[@metadata][kafka][topic]", record.topic)
        event.set("[@metadata][kafka][consumer_group]", @group_id)
        event.set("[@metadata][kafka][partition]", record.partition)
        event.set("[@metadata][kafka][offset]", record.offset)
        event.set("[@metadata][kafka][key]", record.key)
        event.set("[@metadata][kafka][timestamp]", record.timestamp)
      end
      logstash_queue << event
    end
  end
  # Manual offset commit
  if @enable_auto_commit == "false"
    consumer.commitSync
  end

# Rescue on ApiException
rescue org.apache.kafka.common.errors.ApiException => ae
  logger.warn("Exception during poll occured")
  # Perhaps sleep a few seconds before continue?
end

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.