Logstash can not consume messages from kafka

We configured logstash to use "kafka" as input and "elasticsearch" as output. But it seems that the logstash instance can not consume the messages from kafka.

Kindly check the logstash version and its configuration as below:

[root@elk logstash-1.5]# logstash --version
logstash 1.5.0.rc2

[root@elk logstash-1.5]# cat /etc/logstash/central.conf
input {
kafka {
zk_connect => "hdp4:2181"
topic_id => "test"
}
}

output {
elasticsearch {
cluster => "logstash"
}
}

As you can see, the connection with zookeeper has established:

[root@elk logstash-1.5]# netstat -an | grep 2181
tcp 0 0 ::ffff:10.28.0.233:54260 ::ffff:10.80.3.234:2181 ESTABLISHED
[root@elk logstash-1.5]# ping hdp4
PING hdp4 (10.80.3.234) 56(84) bytes of data.
64 bytes from hdp4 (10.80.3.234): icmp_seq=1 ttl=63 time=0.729 ms
64 bytes from hdp4 (10.80.3.234): icmp_seq=2 ttl=63 time=1.08 ms

And we can also assure that there are messages in kafka can be consumed:

[root@hdp6 bin]# ./kafka-console-consumer.sh --zookeeper 10.80.3.234:2181 --topic test --from-beginning
May 13 14:25:54 lek-aix61 auth|security:err|error sshd[6619256]: error: Could not load host key: /etc/ssh/ssh_host_ecdsa_key
May 13 14:25:56 lek-aix61 auth|security:info sshd[6619256]: Accepted password for root from 10.28.0.233 port 58526 ssh2
May 13 14:29:20 lek-aix61 auth|security:info sshd[6619256]: Received disconnect from 10.28.0.233: 11: disconnected by user

Logged into the zookeeper server, and we found that there was no "offset" sub-directory generated for logstash:

[zk: localhost:2181(CONNECTED) 31] ls /consumers/console-consumer-28776
[offsets, owners, ids]
[zk: localhost:2181(CONNECTED) 32] ls /consumers/logstash
[owners, ids]

Thanks a lot for your support in advance.

Noticed the change logs in the new released Logstash 1.5.0 RC4:

  • Kafka:
    • Merged @joekiller's plugin to Logstash to get events from Kafka (#1472)
    • Added support for whitelisting and blacklisting topics in the input.

Will use the new release to have test.

Anybody can help on the error below:

{:timestamp=>"2015-05-15T13:54:57.043000+0800", :message=>"Failed to flush outgoing items", :outgoing_count=>2, :exception=>#<NoMethodError: undefined method `[]' for nil:NilClass>, :backtrace=>["/opt/logstash-1.5/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-0.2.4-java/lib/logstash/outputs/elasticsearch.rb:464:in `flush'", "/opt/logstash-1.5/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-0.2.4-java/lib/logstash/outputs/elasticsearch.rb:462:in `flush'", "/opt/logstash-1.5/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-0.2.4-java/lib/logstash/outputs/elasticsearch.rb:460:in `flush'", "/opt/logstash-1.5/vendor/bundle/jruby/1.9/gems/stud-0.0.19/lib/stud/buffer.rb:219:in `buffer_flush'", "org/jruby/RubyHash.java:1341:in `each'", "/opt/logstash-1.5/vendor/bundle/jruby/1.9/gems/stud-0.0.19/lib/stud/buffer.rb:216:in `buffer_flush'", "/opt/logstash-1.5/vendor/bundle/jruby/1.9/gems/stud-0.0.19/lib/stud/buffer.rb:112:in `buffer_initialize'", "org/jruby/RubyKernel.java:1507:in `loop'", "/opt/logstash-1.5/vendor/bundle/jruby/1.9/gems/stud-0.0.19/lib/stud/buffer.rb:110:in `buffer_initialize'"], :level=>:warn, :file=>"stud/buffer.rb", :line=>"231", :method=>"buffer_flush"}

Tks!

I think you've got the same problem described here. update the logstash-output-elasticsearch plugin to the latest version.

I have a same problem like this。

[2016-05-16 17:56:56,147] INFO Got user-level KeeperException when processing sessionid:0x154b859cdaa0018 type:create cxid:0x14 zxid:0x24a txntype:-1 reqpath:n/a Error Path:/consumers/logstash/owners/mytopic Error:KeeperErrorCode = NoNode for /consumers/logstash/owners/mytopic (org.apache.zookeeper.server.PrepRequestProcessor)

Can you tell me how do you deal with this?
thanks