Unable to create Kafka consumer from given configuration


(Haomeng) #1

I run with logstash5.0 kafka plugin, got below message, and not sure why is it a ArgumentError, and which argument is wrong, any ideas??

[2016-12-09T16:32:43,420][DEBUG][org.apache.kafka.clients.consumer.KafkaConsumer] Starting the Kafka consumer
[2016-12-09T16:32:43,420][DEBUG][org.apache.kafka.clients.consumer.KafkaConsumer] The Kafka consumer has closed.
[2016-12-09T16:32:43,420][ERROR][logstash.inputs.kafka ] Unable to create Kafka consumer from given configuration {:kafka_error_message=>org.apache.kafka.common.KafkaException: F to construct kafka consumer}
[2016-12-09T16:32:43,422][ERROR][logstash.pipeline ] A plugin had an unrecoverable error. Will restart this plugin.
Plugin: <LogStash::Inputs::Kafka bootstrap_servers=>"10.176.95.8", topics=>["topic1-elasticsearch_1"], group_id=>"logstash_1", client_id=>"logstash_1", codec=>LogStash::Codecs::Pid="plain_176fb102-40e7-4e46-943d-15125f10cb5c", enable_metric=>true, charset=>"UTF-8">, consumer_threads=>3, decorate_events=>true, type=>"nginx-access", id=>"94d607dfe60d06ecaedcf4b4ac7413882a4-1", enable_metric=>true, auto_commit_interval_ms=>"5000", enable_auto_commit=>"true", key_deserializer_class=>"org.apache.kafka.common.serialization.StringDeserializvalue_deserializer_class=>"org.apache.kafka.common.serialization.StringDeserializer", ssl=>false>
Error: uncaught throw Failed to construct kafka consumer in thread 0x2e05a
Exception: ArgumentError
Stack: org/jruby/RubyKernel.java:1283:in throw' /usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-5.0.5/lib/logstash/inputs/kafka.rb:268:increate_consumer'
/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-5.0.5/lib/logstash/inputs/kafka.rb:179:in `run'


(Haomeng) #2

Here is my filter argument configuration:

bootstrap_servers => "10.176.95.8"
topics => ["topic1-elasticsearch_1"]
group_id => "logstash_1"
client_id => "logstash_1"


(Edd Figueiredo) #3

I'm having a similar problem, but with logstash version 2.4.1 and kafka_input_plugin 6.2.0

{:timestamp=>"2016-12-19T18:04:05.470000-0200", :message=>"Flushing buffer at interval", :instance=>"#<LogStash::Outputs::ElasticSearch::Buffer:0x68cd91f4 @operations_mutex=#Mutex:0x660a729d, @max_size=500, @operations_lock=#Java::JavaUtilConcurrentLocks::ReentrantLock:0x6235fa84, @submit_proc=#Proc:0x23f10f85@/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-2.7.1-java/lib/logstash/outputs/elasticsearch/common.rb:57, @logger=#<Cabin::Channel:0x165a1ffc @metrics=#<Cabin::Metrics:0x5b763669 @metrics_lock=#Mutex:0x471887e9, @metrics={}, @channel=#<Cabin::Channel:0x165a1ffc ...>>, @subscriber_lock=#Mutex:0x6cbe35a8, @level=:debug, @subscribers={239146=>#<Cabin::Subscriber:0x4072dc60 @output=#<Cabin::Outputs::IO:0x43563c48 @io=#<File:/var/log/logstash/logstash.log>, @lock=#Mutex:0x5e5852ff>, @options={}>, 239148=>#<Cabin::Subscriber:0x1fd5079a @output=#<Cabin::Outputs::IO:0x499249dc @io=#<IO:fd 1>, @lock=#Mutex:0x3bd498da>, @options={:level=>:fatal}>}, @data={}>, @last_flush=2016-12-19 18:04:04 -0200, @flush_interval=1, @stopping=#Concurrent::AtomicBoolean:0x10d9d048, @buffer=[], @flush_thread=#<Thread:0x4b5ea726 run>>", :interval=>1, :level=>:debug, :file=>"logstash/outputs/elasticsearch/buffer.rb", :line=>"90", :method=>"interval_flush"}
{:timestamp=>"2016-12-19T18:04:05.493000-0200", :message=>"Unable to create Kafka consumer from given configuration", :kafka_error_message=>org.apache.kafka.common.KafkaException: Failed to construct kafka consumer, :level=>:error, :file=>"logstash/inputs/kafka.rb", :line=>"315", :method=>"create_consumer"}
{:timestamp=>"2016-12-19T18:04:05.502000-0200", :message=>"A plugin had an unrecoverable error. Will restart this plugin.\n Plugin: <LogStash::Inputs::Kafka codec=><LogStash::Codecs::JSON charset=>"UTF-8">, auto_commit_interval_ms=>"5000", auto_offset_reset=>"earliest", bootstrap_servers=>"server1:9092,server2:9092,server3:9092", client_id=>"logstash", connections_max_idle_ms=>"30000", consumer_threads=>2, enable_auto_commit=>"true", fetch_max_wait_ms=>"500", fetch_min_bytes=>"1000", group_id=>"event_handler", heartbeat_interval_ms=>"10000", max_partition_fetch_bytes=>"60000000", session_timeout_ms=>"30000", poll_timeout_ms=>600, security_protocol=>"SSL", ssl_truststore_location=>"/var/private/ssl/logstash.truststore.jks", ssl_truststore_password=>, ssl_keystore_location=>"/var/private/ssl/logstash.keystore.jks", ssl_keystore_password=>, ssl_key_password=>, topics=>["zupme-gateway"], key_deserializer_class=>"org.apache.kafka.common.serialization.StringDeserializer", value_deserializer_class=>"org.apache.kafka.common.serialization.StringDeserializer", ssl=>false, sasl_mechanism=>"GSSAPI", decorate_events=>false>\n Error: uncaught throw Failed to construct kafka consumer in thread 0x3a63a\n Exception: ArgumentError\n Stack: org/jruby/RubyKernel.java:1283:in throw'\n/opt/logstash/vendor/local_gems/27c10342/logstash-input-kafka-6.2.0/lib/logstash/inputs/kafka.rb:316:increate_consumer'\n/opt/logstash/vendor/local_gems/27c10342/logstash-input-kafka-6.2.0/lib/logstash/inputs/kafka.rb:224:in run'\norg/jruby/RubyFixnum.java:275:intimes'\norg/jruby/RubyEnumerator.java:274:in each'\norg/jruby/RubyEnumerable.java:757:inmap'\n/opt/logstash/vendor/local_gems/27c10342/logstash-input-kafka-6.2.0/lib/logstash/inputs/kafka.rb:224:in run'\n/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-2.4.1-java/lib/logstash/pipeline.rb:342:ininputworker'\n/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-2.4.1-java/lib/logstash/pipeline.rb:336:in `start_input'", :level=>:error, :file=>"logstash/pipeline.rb", :line=>"353", :method=>"inputworker"}

Apparently it fails to create the consumer thread, but i'm not sure exactly why... frustrating ...


(Haomeng) #4

You can try with below command to probe the configurations, it will show the configuration errors in details:

/usr/share/logstash/bin/logstash --log.level=debug --path.settings=/etc/logstash --config.debug -t

And, a hint, the port number is required for kafka input bootstrap_servers arguments, so you can add the port if missing to try again, good luck!

bootstrap_servers => "10.176.95.8:9092"


(Edd Figueiredo) #5

I'm my case, I'm using the correct notation host:port, but logstash is still unable to create the consumer group... I had to enable debug mode in logstash to see what was going on, otherwise nothing was shown in the logs.

But still, I don't have a clear reason why it's not working.


(system) #6

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.