Should Kafka Input handle Exceptions during poll()?


(Johan Rask) #1

Hi,

Kafka clients producer and consumer does good job in making sure that any errors are not propagated up to logstash. However, some errors can slip through and especially during a cluster restart.

Our problem is that with SASL_SSL enabled, the consumer recieves an authentication exception when it reconnects. This error occurs in brokers as well but they retry until it works.

I would suggest adding a begin/rescue around the poll() method and catch ApiExceptions there. I do not see any problem to simply continue even if an error persists instead of terminating the pipeline.

This approach solves our problem since "under-the-hoods", KafkaConsumer will resume connection to brokers successfully.

#Begin before poll()
begin 

  records = consumer.poll(poll_timeout_ms) # throws Exception

  for record in records do
    codec_instance.decode(record.value.to_s) do |event|
      decorate(event)
      if @decorate_events
        event.set("[@metadata][kafka][topic]", record.topic)
        event.set("[@metadata][kafka][consumer_group]", @group_id)
        event.set("[@metadata][kafka][partition]", record.partition)
        event.set("[@metadata][kafka][offset]", record.offset)
        event.set("[@metadata][kafka][key]", record.key)
        event.set("[@metadata][kafka][timestamp]", record.timestamp)
      end
      logstash_queue << event
    end
  end
  # Manual offset commit
  if @enable_auto_commit == "false"
    consumer.commitSync
  end

# Rescue on ApiException
rescue org.apache.kafka.common.errors.ApiException => ae
  logger.warn("Exception during poll occured")
  # Perhaps sleep a few seconds before continue?
end

(system) #2

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.