We have a logstash config, which is used to send data to a Kafka broker.
The logstash config is as follows:
input {                                                                                               
        file {                                                                                        
                path => "file path"
                start_position => "beginning"
		ignore_older => 0                                                         
                sincedb_path => "/dev/null"                                                           
                codec => multiline {                                                                  
                        pattern => "^.(D|E|S|L|X|O)"                                                     
                        what => "previous"
			max_lines => 4000
		}                                            
        }                                                                                             
}
filter
{
....
}
output {
	
	if "_dateparsefailure" not in [tags]
	{	
		elasticsearch {
			hosts => "9.109.184.72:9200"
			index => "stream-warehouse-index"
			template_overwrite => true
		}
	
		stdout {
			codec => rubydebug
		}
                kafka {
                        topic_id => "MFSYSLOGQUEUE"
                        bootstrap_servers => ["localhost:9092"]
                }
	}
}
With the debug option turned on, we get the following messages:
    16:42:58.589 [[main]>worker1] DEBUG logstash.outputs.kafka - KafkaProducer.send() failed: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms. {:exception=>java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.}           
    16:42:58.595 [[main]>worker1] INFO  logstash.outputs.kafka - Sending batch to Kafka failed. Will retry after a delay. {:batch_size=>9, :failures=>9, :sleep=>0.01}
The other details that we are able to locate are
topic_id = "MFSYSLOGQUEUE"
bootstrap_servers = "localhost:9092"
id = "489a7f9bf2a0d2cc7a18871490f3ba19272b2231-10"
enable_metric = true
codec = <LogStash::Codecs::Plain id=>"plain_416b040b-a35b-46fd-b014-d8e4d588dacc", enable_metric=>true, charset=>"UTF-8">
workers = 1
acks = "1"
batch_size = 16384
block_on_buffer_full = true
buffer_memory = 33554432
compression_type = "none"
key_serializer = "org.apache.kafka.common.serialization.StringSerializer"
linger_ms = 0
max_request_size = 1048576
metadata_fetch_timeout_ms = 60000
metadata_max_age_ms = 300000
receive_buffer_bytes = 32768
reconnect_backoff_ms = 10
retry_backoff_ms = 100
send_buffer_bytes = 131072
ssl = false
security_protocol = "PLAINTEXT"
sasl_mechanism = "GSSAPI"
timeout_ms = 30000
value_serializer = "org.apache.kafka.common.serialization.StringSerializer"
We have checked the kafka connection using telnet:
[root@analyticslab72 ~]# telnet localhost 9092
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
Kafka is setup as a service in Cloudera manager and it is set to listen to this port using
listeners=PLAINTEXT://0.0.0.0:9092