Logstash outputs kafka - KafkaProducer.send() failed

We have a logstash config, which is used to send data to a Kafka broker.

The logstash config is as follows:

input {                                                                                               
        file {                                                                                        
                path => "file path"
                start_position => "beginning"
		ignore_older => 0                                                         
                sincedb_path => "/dev/null"                                                           
                codec => multiline {                                                                  
                        pattern => "^.(D|E|S|L|X|O)"                                                     
                        what => "previous"
			max_lines => 4000
		}                                            
        }                                                                                             
}

filter
{
....
}

output {
	
	if "_dateparsefailure" not in [tags]
	{	
		elasticsearch {
			hosts => "9.109.184.72:9200"
			index => "stream-warehouse-index"
			template_overwrite => true
		}
	

		stdout {
			codec => rubydebug
		}

                kafka {
                        topic_id => "MFSYSLOGQUEUE"
                        bootstrap_servers => ["localhost:9092"]
                }
	}
}

With the debug option turned on, we get the following messages:

    16:42:58.589 [[main]>worker1] DEBUG logstash.outputs.kafka - KafkaProducer.send() failed: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms. {:exception=>java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.}           
    16:42:58.595 [[main]>worker1] INFO  logstash.outputs.kafka - Sending batch to Kafka failed. Will retry after a delay. {:batch_size=>9, :failures=>9, :sleep=>0.01}

The other details that we are able to locate are

topic_id = "MFSYSLOGQUEUE"
bootstrap_servers = "localhost:9092"
id = "489a7f9bf2a0d2cc7a18871490f3ba19272b2231-10"
enable_metric = true
codec = <LogStash::Codecs::Plain id=>"plain_416b040b-a35b-46fd-b014-d8e4d588dacc", enable_metric=>true, charset=>"UTF-8">
workers = 1
acks = "1"
batch_size = 16384
block_on_buffer_full = true
buffer_memory = 33554432
compression_type = "none"
key_serializer = "org.apache.kafka.common.serialization.StringSerializer"
linger_ms = 0
max_request_size = 1048576
metadata_fetch_timeout_ms = 60000
metadata_max_age_ms = 300000
receive_buffer_bytes = 32768
reconnect_backoff_ms = 10
retry_backoff_ms = 100
send_buffer_bytes = 131072
ssl = false
security_protocol = "PLAINTEXT"
sasl_mechanism = "GSSAPI"
timeout_ms = 30000
value_serializer = "org.apache.kafka.common.serialization.StringSerializer"

We have checked the kafka connection using telnet:

[root@analyticslab72 ~]# telnet localhost 9092
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.

Kafka is setup as a service in Cloudera manager and it is set to listen to this port using

listeners=PLAINTEXT://0.0.0.0:9092

To rule out any issue on the kafka end, tested the kafka installation using a simple python code (after installing the python-kafka package).

Ran the following code, from my laptop, pointing to the kafka broker @ 9.109.184.72, using the python shell:

from kafka import KafkaProducer
from kafka.errors import KafkaError

producer = KafkaProducer(bootstrap_servers=['9.109.184.72:9092'])
future = producer.send('py-topic', b'tpn1_raw_bytes')
try:
    record_metadata = future.get(timeout=10)
except KafkaError:
    # Decide what to do if produce request failed...
    log.exception()
    pass

print (record_metadata.topic)
print (record_metadata.partition)
print (record_metadata.offset)

for _ in range(100):
    producer.send('py-topic', b'tpn1_msg')

producer.flush()

Then in another terminal tab, ran the following code snippet to consume the same messages:

from kafka import KafkaConsumer

consumer = KafkaConsumer('py-topic',
                         bootstrap_servers=['9.109.184.72:9092'])

for message in consumer:
    print ("%s:%d:%d:" % (message.topic, message.partition,
                                          message.offset))

Had to make sure that the consumer part was already executed, before executing the producer part. The Kafka consumer code successfully printed the message related info like 'topic name:partition:offset' for each message.

A sample:

py-topic:1:273:
py-topic:0:251:
py-topic:0:252:
py-topic:1:274:
py-topic:0:253:
py-topic:0:254:
py-topic:0:255:
py-topic:0:256:
py-topic:0:257:
py-topic:0:258:
py-topic:0:259:
py-topic:0:260:
py-topic:0:261:
py-topic:0:262:

Was able to consume the messages using

/usr/lib/kafka/bin/kafka-console-consumer.sh --zookeeper 9.109.184.72:2181 --topic py-topic

At this point of time, looks like i am doing some thing wrong from the logstash end!.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.