Kfaka not writing to elasticsearch using logstash

Here is the producer's config file of logstash:

input {
#file {

path => "/var/log/messages"

start_position => "beginning"

#}
stdin { }
}

output {
kafka {
bootstrap_servers => "kfaka-server-01:9092,kfaka-server-02:9092,kafka-server-03:9092"
topic_id => "topic01-2017-03-02"
compression_type => "snappy"
}
#stdout { codec => rubydebug }
}

consumer's config file of logstash:

input {
kafka {
bootstrap_servers => "kafka-server-01:2181,kfaka-server-02:2181,kfaka-server-03:2181"
topics => "topic01-2017-03-02"
#codec => plain
#reset_beginning => false
#consumer_threads => 5
#decorate_events => true
}
#stdin { }
}

output {
elasticsearch {
hosts => ["es-server-01:9200","es-server-02:9200"]
index => "index01-2017-03-02"
}
stdout { codec => rubydebug }
}

When I input sth from stdin in the producer's side, I can check out the messages from kafka using kfaka-console-consume.sh at the same time. That means the first logstash config file is correct, right?

But I cannot get any messages from elasticsearch, including the index.
ps: i can get messages when i change the 2nd config file's input to stdin.
can anyone help?

my soft version:

elasticsearch 5.2.1
logstash 5.2.1
kfaka 2.12-0.10.2.0

Some suggestion as fllows:
1. topics is array type like this:
topics => ["topic01-2017-03-02"]

2. set a new group_id, if there exists another logstash instance consumer with the same group id, and the offset was consumed to the latest:
group_id => "new_group"

3. set auto_offset_reset to earliest to start a new offset, or set to none to make sure if the previous offset would be found or occupied (exception would be thrown):
auto_offset_reset => "earliest"
or
auto_offset_reset => "none"

hoping this helps
--Fanfan

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.