We have a logstash config, which is used to send data to a Kafka broker.
The logstash config is as follows:
input {
file {
path => "file path"
start_position => "beginning"
ignore_older => 0
sincedb_path => "/dev/null"
codec => multiline {
pattern => "^.(D|E|S|L|X|O)"
what => "previous"
max_lines => 4000
output {
if "_dateparsefailure" not in [tags]
elasticsearch {
hosts => ""
index => "stream-warehouse-index"
template_overwrite => true
stdout {
codec => rubydebug
kafka {
topic_id => "MFSYSLOGQUEUE"
bootstrap_servers => ["localhost:9092"]
With the debug option turned on, we get the following messages:
16:42:58.589 [[main]>worker1] DEBUG logstash.outputs.kafka - KafkaProducer.send() failed: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms. {:exception=>java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.}
16:42:58.595 [[main]>worker1] INFO logstash.outputs.kafka - Sending batch to Kafka failed. Will retry after a delay. {:batch_size=>9, :failures=>9, :sleep=>0.01}
The other details that we are able to locate are
topic_id = "MFSYSLOGQUEUE"
bootstrap_servers = "localhost:9092"
id = "489a7f9bf2a0d2cc7a18871490f3ba19272b2231-10"
enable_metric = true
codec = <LogStash::Codecs::Plain id=>"plain_416b040b-a35b-46fd-b014-d8e4d588dacc", enable_metric=>true, charset=>"UTF-8">
workers = 1
acks = "1"
batch_size = 16384
block_on_buffer_full = true
buffer_memory = 33554432
compression_type = "none"
key_serializer = "org.apache.kafka.common.serialization.StringSerializer"
linger_ms = 0
max_request_size = 1048576
metadata_fetch_timeout_ms = 60000
metadata_max_age_ms = 300000
receive_buffer_bytes = 32768
reconnect_backoff_ms = 10
retry_backoff_ms = 100
send_buffer_bytes = 131072
ssl = false
security_protocol = "PLAINTEXT"
sasl_mechanism = "GSSAPI"
timeout_ms = 30000
value_serializer = "org.apache.kafka.common.serialization.StringSerializer"
We have checked the kafka connection using telnet:
[root@analyticslab72 ~]# telnet localhost 9092
Connected to localhost.
Escape character is '^]'.
Kafka is setup as a service in Cloudera manager and it is set to listen to this port using