Consuming data with logstash kafka plugin

Hi

I have a logstash configuration file which is not consume data that I produce with kafka.
Basically, here is my configuration

input {
    kafka {
       topics => "mytopic"
       bootstrap_servers => "zookeeper_ip_address:2181"
    }
}
output {
    stdout{}
}

As the documentation requires, I'm using kafka 0.10, logstash 2.4 and logstash plugin 5.0.5. https://www.elastic.co/guide/en/logstash/5.0/plugins-inputs-kafka.html#plugins-inputs-kafka-topics

I'm producing data in the topic mytopic but I'm not seeing any thing on the stdout when I launch logstash as

/opt/logstash/bin/logstash --debug -f /logstash.conf

Any clue please?
Thanks.

Here is what the logs says

Reading config file {:config_file=>"/logstash.conf", :level=>:debug, :file=>"logstash/config/loader.rb",     :line=>"69", :method=>"local_config"}
Plugin not defined in namespace, checking for plugin file {:type=>"input", :name=>"stdin", :path=>"logstash/inputs/stdin", :level=>:debug, :file=>"logstash/plugin.rb", :line=>"86", :method=>"lookup"}
Plugin not defined in namespace, checking for plugin file {:type=>"codec", :name=>"line", :path=>"logstash/codecs/line", :level=>:debug, :file=>"logstash/plugin.rb", :line=>"86", :method=>"lookup"}
config LogStash::Codecs::Line/@charset = "UTF-8" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"154", :method=>"config_init"}
config LogStash::Codecs::Line/@delimiter = "\n" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"154", :method=>"config_init"}
config LogStash::Inputs::Stdin/@codec = <LogStash::Codecs::Line charset=>"UTF-8", delimiter=>"\n"> {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"154", :method=>"config_init"}
config LogStash::Inputs::Stdin/@add_field = {} {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"154", :method=>"config_init"}
Plugin not defined in namespace, checking for plugin file {:type=>"input", :name=>"kafka", :path=>"logstash/inputs/kafka", :level=>:debug, :file=>"logstash/plugin.rb", :line=>"86", :method=>"lookup"}
Plugin not defined in namespace, checking for plugin file {:type=>"codec", :name=>"plain", :path=>"logstash/codecs/plain", :level=>:debug, :file=>"logstash/plugin.rb", :line=>"86", :method=>"lookup"}
config LogStash::Codecs::Plain/@charset = "UTF-8" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"154", :method=>"config_init"}
config LogStash::Inputs::Kafka/@topics = ["account-signup-topic"] {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"154", :method=>"config_init"}
config LogStash::Inputs::Kafka/@bootstrap_servers = "192.168.0.20:2181" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"154", :method=>"config_init"}
config LogStash::Inputs::Kafka/@codec = <LogStash::Codecs::Plain charset=>"UTF-8"> {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"154", :method=>"config_init"}
config LogStash::Inputs::Kafka/@add_field = {} {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"154", :method=>"config_init"}
config LogStash::Inputs::Kafka/@auto_commit_interval_ms = "5000" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"154", :method=>"config_init"}
config LogStash::Inputs::Kafka/@client_id = "logstash" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"154", :method=>"config_init"}
config LogStash::Inputs::Kafka/@consumer_threads = 1 {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"154", :method=>"config_init"}
config LogStash::Inputs::Kafka/@enable_auto_commit = "true" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"154", :method=>"config_init"}
config LogStash::Inputs::Kafka/@group_id = "logstash" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"154", :method=>"config_init"}
config LogStash::Inputs::Kafka/@key_deserializer_class = "org.apache.kafka.common.serialization.StringDeserializer" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"154", :method=>"config_init"}
config LogStash::Inputs::Kafka/@value_deserializer_class = "org.apache.kafka.common.serialization.StringDeserializer" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"154", :method=>"config_init"}
config LogStash::Inputs::Kafka/@ssl = false {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"154", :method=>"config_init"}
config LogStash::Inputs::Kafka/@decorate_events = false {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"154", :method=>"config_init"}
Plugin not defined in namespace, checking for plugin file {:type=>"output", :name=>"file", :path=>"logstash/outputs/file", :level=>:debug, :file=>"logstash/plugin.rb", :line=>"86", :method=>"lookup"}
Plugin not defined in namespace, checking for plugin file {:type=>"output", :name=>"stdout", :path=>"logstash/outputs/stdout", :level=>:debug, :file=>"logstash/plugin.rb", :line=>"86", :method=>"lookup"}
starting agent {:level=>:info, :file=>"logstash/agent.rb", :line=>"213", :method=>"execute"}
starting pipeline {:id=>"main", :level=>:info, :file=>"logstash/agent.rb", :line=>"487", :method=>"start_pipeline"}
Settings: Default pipeline workers: 4

I'm very concerned about this issue because it is very urgent for to me get this working ASAP. So, I did addition changes. With kafka 0.10 still in place, I uninstall logstash 2.4 and install logstash 5.0. I look at logstash-input-plugin and it version is 5.0.1. I maintained the same config as above and logstash. But when I do that, I have this error and logstash starts and stops a moment after.

fetched an invalid config {:config=>"input {  \n   stdin{}\n   kafka {\n       topics => \"account-signup-topic\"\n       bootstrap_servers => \"192.168.0.202:2181\"\n    }\n}\noutput {    \nstdout\n{\ncodec => \"json\"\n}\nelasticsearch {\n\tmanage_template => false\n         hosts => [\"192.168.0.202:9200\"]\n         user => \"fenkam\"\n\tpassword => \"mysecret\"\n\tcodec => \"json\"\n\tindex => \"mynewindex\"\n}\n     \n}\n\n\n", :reason=>"Couldn't find any input plugin named 'kafka'. Are you sure this is correct? Trying to load the kafka input plugin resulted in this error: Problems loading the requested plugin named kafka of type input.", :level=>:error}

As the documentation, logstash 5.0.x is compactible with logstash-input-kafka 5.x.x and kafka 0.10. So, I'm asking myself I have not done something weird.

Any help will be greatly appreciated.
Thanks.

I'm seeing the same issue running logstash 2.4.1 / logstash-input-kafka 5.1.2 and kafka 0.10.0.1...

@sreyno Sorry I forget to give updates about this issue. I just upgrade my logstash to 5.x and kafka to kafka_2.11-0.10.1.0 where 0.10.1.0 is the version of kafka and 2.11 is that of scala. With these updates, I did the following configurations.

input {
   kafka { 
    bootstrap_servers => ["kafkaipaddress:9092"]
    topics => ["topic1","topic2"]
        codec => "json"
        group_id => "logstash1"
  }
}
output {
    stdout{}
} 

Hope this helps.

Hi @jstar, may be it had to do with wrong bootstrap server setting. In initial config you mentioned ZooKeeper server host:port instead of a Kafka bootstrap server.

Hi @sreyno, sure yes I guess the issue of Zookeeper. Instead indicating the address of the kafka broker, I indicated the one of zookeeper. Anyway the upgrade was also beneficial for me in many other cases.

Thanks @singrahu for pointing out that.