Unable to create Kafka consumer from given configuration

I run with logstash5.0 kafka plugin, got below message, and not sure why is it a ArgumentError, and which argument is wrong, any ideas??

[2016-12-09T16:32:43,420][DEBUG][org.apache.kafka.clients.consumer.KafkaConsumer] Starting the Kafka consumer
[2016-12-09T16:32:43,420][DEBUG][org.apache.kafka.clients.consumer.KafkaConsumer] The Kafka consumer has closed.
[2016-12-09T16:32:43,420][ERROR][logstash.inputs.kafka ] Unable to create Kafka consumer from given configuration {:kafka_error_message=>org.apache.kafka.common.KafkaException: F to construct kafka consumer}
[2016-12-09T16:32:43,422][ERROR][logstash.pipeline ] A plugin had an unrecoverable error. Will restart this plugin.
Plugin: <LogStash::Inputs::Kafka bootstrap_servers=>"10.176.95.8", topics=>["topic1-elasticsearch_1"], group_id=>"logstash_1", client_id=>"logstash_1", codec=>LogStash::Codecs::Pid="plain_176fb102-40e7-4e46-943d-15125f10cb5c", enable_metric=>true, charset=>"UTF-8">, consumer_threads=>3, decorate_events=>true, type=>"nginx-access", id=>"94d607dfe60d06ecaedcf4b4ac7413882a4-1", enable_metric=>true, auto_commit_interval_ms=>"5000", enable_auto_commit=>"true", key_deserializer_class=>"org.apache.kafka.common.serialization.StringDeserializvalue_deserializer_class=>"org.apache.kafka.common.serialization.StringDeserializer", ssl=>false>
Error: uncaught throw Failed to construct kafka consumer in thread 0x2e05a
Exception: ArgumentError
Stack: org/jruby/RubyKernel.java:1283:in throw' /usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-5.0.5/lib/logstash/inputs/kafka.rb:268:increate_consumer'
/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-5.0.5/lib/logstash/inputs/kafka.rb:179:in `run'

Here is my filter argument configuration:

bootstrap_servers => "10.176.95.8"
topics => ["topic1-elasticsearch_1"]
group_id => "logstash_1"
client_id => "logstash_1"

I'm having a similar problem, but with logstash version 2.4.1 and kafka_input_plugin 6.2.0

{:timestamp=>"2016-12-19T18:04:05.470000-0200", :message=>"Flushing buffer at interval", :instance=>"#<LogStash::Outputs::ElasticSearch::Buffer:0x68cd91f4 @operations_mutex=#Mutex:0x660a729d, @max_size=500, @operations_lock=#Java::JavaUtilConcurrentLocks::ReentrantLock:0x6235fa84, @submit_proc=#Proc:0x23f10f85@/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-2.7.1-java/lib/logstash/outputs/elasticsearch/common.rb:57, @logger=#<Cabin::Channel:0x165a1ffc @metrics=#<Cabin::Metrics:0x5b763669 @metrics_lock=#Mutex:0x471887e9, @metrics={}, @channel=#<Cabin::Channel:0x165a1ffc ...>>, @subscriber_lock=#Mutex:0x6cbe35a8, @level=:debug, @subscribers={239146=>#<Cabin::Subscriber:0x4072dc60 @output=#<Cabin::Outputs::IO:0x43563c48 @io=#<File:/var/log/logstash/logstash.log>, @lock=#Mutex:0x5e5852ff>, @options={}>, 239148=>#<Cabin::Subscriber:0x1fd5079a @output=#<Cabin::Outputs::IO:0x499249dc @io=#<IO:fd 1>, @lock=#Mutex:0x3bd498da>, @options={:level=>:fatal}>}, @data={}>, @last_flush=2016-12-19 18:04:04 -0200, @flush_interval=1, @stopping=#Concurrent::AtomicBoolean:0x10d9d048, @buffer=[], @flush_thread=#<Thread:0x4b5ea726 run>>", :interval=>1, :level=>:debug, :file=>"logstash/outputs/elasticsearch/buffer.rb", :line=>"90", :method=>"interval_flush"}
{:timestamp=>"2016-12-19T18:04:05.493000-0200", :message=>"Unable to create Kafka consumer from given configuration", :kafka_error_message=>org.apache.kafka.common.KafkaException: Failed to construct kafka consumer, :level=>:error, :file=>"logstash/inputs/kafka.rb", :line=>"315", :method=>"create_consumer"}
{:timestamp=>"2016-12-19T18:04:05.502000-0200", :message=>"A plugin had an unrecoverable error. Will restart this plugin.\n Plugin: <LogStash::Inputs::Kafka codec=><LogStash::Codecs::JSON charset=>"UTF-8">, auto_commit_interval_ms=>"5000", auto_offset_reset=>"earliest", bootstrap_servers=>"server1:9092,server2:9092,server3:9092", client_id=>"logstash", connections_max_idle_ms=>"30000", consumer_threads=>2, enable_auto_commit=>"true", fetch_max_wait_ms=>"500", fetch_min_bytes=>"1000", group_id=>"event_handler", heartbeat_interval_ms=>"10000", max_partition_fetch_bytes=>"60000000", session_timeout_ms=>"30000", poll_timeout_ms=>600, security_protocol=>"SSL", ssl_truststore_location=>"/var/private/ssl/logstash.truststore.jks", ssl_truststore_password=>, ssl_keystore_location=>"/var/private/ssl/logstash.keystore.jks", ssl_keystore_password=>, ssl_key_password=>, topics=>["zupme-gateway"], key_deserializer_class=>"org.apache.kafka.common.serialization.StringDeserializer", value_deserializer_class=>"org.apache.kafka.common.serialization.StringDeserializer", ssl=>false, sasl_mechanism=>"GSSAPI", decorate_events=>false>\n Error: uncaught throw Failed to construct kafka consumer in thread 0x3a63a\n Exception: ArgumentError\n Stack: org/jruby/RubyKernel.java:1283:in throw'\n/opt/logstash/vendor/local_gems/27c10342/logstash-input-kafka-6.2.0/lib/logstash/inputs/kafka.rb:316:increate_consumer'\n/opt/logstash/vendor/local_gems/27c10342/logstash-input-kafka-6.2.0/lib/logstash/inputs/kafka.rb:224:in run'\norg/jruby/RubyFixnum.java:275:intimes'\norg/jruby/RubyEnumerator.java:274:in each'\norg/jruby/RubyEnumerable.java:757:inmap'\n/opt/logstash/vendor/local_gems/27c10342/logstash-input-kafka-6.2.0/lib/logstash/inputs/kafka.rb:224:in run'\n/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-2.4.1-java/lib/logstash/pipeline.rb:342:ininputworker'\n/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-2.4.1-java/lib/logstash/pipeline.rb:336:in `start_input'", :level=>:error, :file=>"logstash/pipeline.rb", :line=>"353", :method=>"inputworker"}

Apparently it fails to create the consumer thread, but i'm not sure exactly why... frustrating ...

You can try with below command to probe the configurations, it will show the configuration errors in details:

/usr/share/logstash/bin/logstash --log.level=debug --path.settings=/etc/logstash --config.debug -t

And, a hint, the port number is required for kafka input bootstrap_servers arguments, so you can add the port if missing to try again, good luck!

bootstrap_servers => "10.176.95.8:9092"

I'm my case, I'm using the correct notation host:port, but logstash is still unable to create the consumer group... I had to enable debug mode in logstash to see what was going on, otherwise nothing was shown in the logs.

But still, I don't have a clear reason why it's not working.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.