Hi all
here i am trying to push Elasticsearch data into kafka
input {
elasticsearch {
hosts => "192.168.1.59:9233"
index => "titanic1"
# user => "elastic"
# password => "changeme"
# query => '{ "query":{ "match_all": {} }, "sort": [ "_doc" ] }'
# query => '{ "query":{"range" : { "@timestamp" : { "gte" : "now-1m/m" } } }, "sort": [ "_doc" ] }'
# query => '{ "query":{"range" : { "@timestamp" : { "gte" : "now-1m" } } }, "sort": [ "_doc" ] }'
size => 5000
scroll => "5m"
docinfo => true
# docinfo_fields => [ "_id" ]
# docinfo_target => "@metadata"
}
}
#filter {
# mutate {
# add_field => {
# 'doc_id' => "%{[@metadata][_id]}"
# }
# }
#}
output {
if [Pclass] == "2"
{
kafka {
codec => plain {
format => "%{message}"
}
# codec => json
bootstrap_servers => ["192.168.1.70:9092"]
#broker_list => "192.168.1.70:9092"
topic_id => "sample5"
}
#exec {
# command => "curl -XDELETE 192.168.1.59:9233/%{[@metadata][_index]}/%{[@metadata][_type]}/%{[@metadata][_id]}?pretty"
#}
#stdout {codec => rubydebug }
#stdout { codec => rubydebug { metadata => true } }
}
}
it is able to contact kafka
[root@sgplrhel6 ~]# netstat -punta | grep 192.168.1.75
tcp 0 0 ::ffff:192.168.1.70:9092 ::ffff:192.168.1.75:53596 ESTABLISHED 15323/java
and logstash also able to create topic_name in kafka if not exists
but logstash failed to push data into topic
here you can see logstash logs
[2017-10-18T17:13:24,759][DEBUG][logstash.outputs.kafka ] KafkaProducer.send() failed: org.apache.kafka.common.errors.TimeoutException: Batch containing 28 record(s) expired due to timeout while requesting metadata from brokers for sample5-0 {:exception=>java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Batch containing 28 record(s) expired due to timeout while requesting metadata from brokers for sample5-0}
[2017-10-18T17:13:24,759][DEBUG][logstash.outputs.kafka ] KafkaProducer.send() failed: org.apache.kafka.common.errors.TimeoutException: Batch containing 28 record(s) expired due to timeout while requesting metadata from brokers for sample5-0 {:exception=>java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Batch containing 28 record(s) expired due to timeout while requesting metadata from brokers for sample5-0}
[2017-10-18T17:13:24,760][DEBUG][logstash.outputs.kafka ] KafkaProducer.send() failed: org.apache.kafka.common.errors.TimeoutException: Batch containing 28 record(s) expired due to timeout while requesting metadata from brokers for sample5-0 {:exception=>java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Batch containing 28 record(s) expired due to timeout while requesting metadata from brokers for sample5-0}
[2017-10-18T17:13:24,760][DEBUG][logstash.outputs.kafka ] KafkaProducer.send() failed: org.apache.kafka.common.errors.TimeoutException: Batch containing 28 record(s) expired due to timeout while requesting metadata from brokers for sample5-0 {:exception=>java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Batch containing 28 record(s) expired due to timeout while requesting metadata from brokers for sample5-0}
[2017-10-18T17:13:24,760][INFO ][logstash.outputs.kafka ] Sending batch to Kafka failed. Will retry after a delay. {:batch_size=>9, :failures=>9, :sleep=>0.01}
[2017-10-18T17:13:24,761][DEBUG][logstash.outputs.kafka ] KafkaProducer.send() failed: org.apache.kafka.common.errors.TimeoutException: Batch containing 28 record(s) expired due to timeout while requesting metadata from brokers for sample5-0 {:exception=>java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Batch containing 28 record(s) expired due to timeout while requesting metadata from brokers for sample5-0}
[2017-10-18T17:13:24,761][INFO ][logstash.outputs.kafka ] Sending batch to Kafka failed. Will retry after a delay. {:batch_size=>13, :failures=>13, :sleep=>0.01}
Thank you.