Logstash output plugin kafka is not pushing data into kafka


(Kabali12345) #1

Hi all
here i am trying to push Elasticsearch data into kafka

input { 
  elasticsearch { 
    hosts => "192.168.1.59:9233" 
    index => "titanic1" 
#    user => "elastic" 
#    password => "changeme" 
#    query => '{ "query":{ "match_all": {} }, "sort": [ "_doc" ] }' 
#    query => '{ "query":{"range" : {    "@timestamp" : {        "gte" : "now-1m/m"   } } }, "sort": [ "_doc" ] }' 
#    query => '{ "query":{"range" : {    "@timestamp" : {        "gte" : "now-1m"   } } }, "sort": [ "_doc" ] }' 
    size => 5000 
    scroll => "5m"
    docinfo => true 
#   docinfo_fields => [ "_id" ]
#   docinfo_target => "@metadata" 
} 
 }
#filter {
#  mutate {
#    add_field => {
#      'doc_id' => "%{[@metadata][_id]}"
#    }
#  }
#} 
output {
if [Pclass] == "2"
{
kafka {
codec => plain {
format => "%{message}"
 }
# codec => json
bootstrap_servers => ["192.168.1.70:9092"]    
#broker_list => "192.168.1.70:9092"    
topic_id => "sample5"
 }
#exec {
#    command => "curl -XDELETE 192.168.1.59:9233/%{[@metadata][_index]}/%{[@metadata][_type]}/%{[@metadata][_id]}?pretty" 
#} 
#stdout {codec => rubydebug } 
#stdout { codec => rubydebug { metadata => true } }
}
} 

it is able to contact kafka

[root@sgplrhel6 ~]# netstat -punta | grep 192.168.1.75
tcp        0      0 ::ffff:192.168.1.70:9092    ::ffff:192.168.1.75:53596   ESTABLISHED 15323/java    

and logstash also able to create topic_name in kafka if not exists
but logstash failed to push data into topic

here you can see logstash logs

[2017-10-18T17:13:24,759][DEBUG][logstash.outputs.kafka   ] KafkaProducer.send() failed: org.apache.kafka.common.errors.TimeoutException: Batch containing 28 record(s) expired due to timeout while requesting metadata from brokers for sample5-0 {:exception=>java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Batch containing 28 record(s) expired due to timeout while requesting metadata from brokers for sample5-0}
[2017-10-18T17:13:24,759][DEBUG][logstash.outputs.kafka   ] KafkaProducer.send() failed: org.apache.kafka.common.errors.TimeoutException: Batch containing 28 record(s) expired due to timeout while requesting metadata from brokers for sample5-0 {:exception=>java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Batch containing 28 record(s) expired due to timeout while requesting metadata from brokers for sample5-0}
[2017-10-18T17:13:24,760][DEBUG][logstash.outputs.kafka   ] KafkaProducer.send() failed: org.apache.kafka.common.errors.TimeoutException: Batch containing 28 record(s) expired due to timeout while requesting metadata from brokers for sample5-0 {:exception=>java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Batch containing 28 record(s) expired due to timeout while requesting metadata from brokers for sample5-0}
[2017-10-18T17:13:24,760][DEBUG][logstash.outputs.kafka   ] KafkaProducer.send() failed: org.apache.kafka.common.errors.TimeoutException: Batch containing 28 record(s) expired due to timeout while requesting metadata from brokers for sample5-0 {:exception=>java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Batch containing 28 record(s) expired due to timeout while requesting metadata from brokers for sample5-0}
[2017-10-18T17:13:24,760][INFO ][logstash.outputs.kafka   ] Sending batch to Kafka failed. Will retry after a delay. {:batch_size=>9, :failures=>9, :sleep=>0.01}
[2017-10-18T17:13:24,761][DEBUG][logstash.outputs.kafka   ] KafkaProducer.send() failed: org.apache.kafka.common.errors.TimeoutException: Batch containing 28 record(s) expired due to timeout while requesting metadata from brokers for sample5-0 {:exception=>java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Batch containing 28 record(s) expired due to timeout while requesting metadata from brokers for sample5-0}
[2017-10-18T17:13:24,761][INFO ][logstash.outputs.kafka   ] Sending batch to Kafka failed. Will retry after a delay. {:batch_size=>13, :failures=>13, :sleep=>0.01}

Thank you.


(David Pilato) #2

I moved your question to #logstash


(Kabali12345) #3

After updating all logstash plugins
now i am seeing following logs

[2017-10-18T19:55:36,794][INFO ][logstash.outputs.kafka   ] Sending batch to Kafka failed. Will retry after a delay. {:batch_size=>6, :failures=>6, :sleep=>0.01}
[2017-10-18T19:56:06,901][INFO ][logstash.outputs.kafka   ] Sending batch to Kafka failed. Will retry after a delay. {:batch_size=>1, :failures=>1, :sleep=>0.01}
[2017-10-18T19:56:06,903][INFO ][logstash.outputs.kafka   ] Sending batch to Kafka failed. Will retry after a delay. {:batch_size=>3, :failures=>3, :sleep=>0.01}
[2017-10-18T19:56:06,905][INFO ][logstash.outputs.kafka   ] Sending batch to Kafka failed. Will retry after a delay. {:batch_size=>5, :failures=>5, :sleep=>0.01}
[2017-10-18T19:56:06,910][INFO ][logstash.outputs.kafka   ] Sending batch to Kafka failed. Will retry after a delay. {:batch_size=>6, :failures=>6, :sleep=>0.01}
[2017-10-18T19:56:37,019][INFO ][logstash.outputs.kafka   ] Sending batch to Kafka failed. Will retry after a delay. {:batch_size=>1, :failures=>1, :sleep=>0.01}
[2017-10-18T19:56:37,029][INFO ][logstash.outputs.kafka   ] Sending batch to Kafka failed. Will retry after a delay. {:batch_size=>5, :failures=>5, :sleep=>0.01}
[2017-10-18T19:56:37,031][INFO ][logstash.outputs.kafka   ] Sending batch to Kafka failed. Will retry after a delay. {:batch_size=>3, :failures=>3, :sleep=>0.01}
[2017-10-18T19:56:37,031][INFO ][logstash.outputs.kafka   ] Sending batch to Kafka failed. Will retry after a delay. {:batch_size=>6, :failures=>6, :sleep=>0.01}

Sending batch to Kafka failed. Will retry after a delay Logstash error
Logstash keeps crashing
(Kabali12345) #4

My problem was resolved by adding below line to server.properties

listeners=PLAINTEXT://192.168.1.70:9092

[root@sgplrhel6 config]# pwd
/opt/kafka_2.10-0.10.0.1/config
[root@sgplrhel6 config]# ls
connect-console-sink.properties connect-file-source.properties log4j.properties zookeeper.properties
connect-console-source.properties connect-log4j.properties producer.properties
connect-distributed.properties connect-standalone.properties server.properties
connect-file-sink.properties consumer.properties tools-log4j.properties
[root@sgplrhel6 config]# vi server.properties
[root@sgplrhel6 config]#


(system) #5

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.