Logstash kafka output: Failed to construct kafka producer

i have a problem connection to kafka cluster with Logstash.

Version: logstash 5.1.2
Plugin Version logstash-output-kafka (5.1.1)
Operating System: Debian GNU/Linux 7.5 (wheezy)
ruby version: ruby 1.9.3p194 (2012-04-20 revision 35410) [x86_64-linux]

Config File:

input {
   tcp {
    port => 9900
    codec => json
  }
}


output {
    kafka {
	 codec => plain {    
             format => "%{message}"    
         }
         bootstrap_servers => 'kafka1:9093,kafka2:9093'
         topic_id => 'test3'
         security_protocol=>"SSL"
         ssl_keystore_location => "/etc/logstash/ssl/kafka.client.keystore.jks"
         ssl_truststore_location => "/etc/logstash/ssl/kafka.client.truststore.jks"
         ssl_key_password => "mypass"
         ssl_keystore_password => "mypass"
         ssl_truststore_password => "mypass"

    }
}

Log output:

[2017-01-27T15:14:44,029][INFO ][logstash.inputs.tcp      ] Automatically switching from json to json_lines codec {:plugin=>"tcp"}
[2017-01-27T15:14:44,041][INFO ][logstash.inputs.tcp      ] Starting tcp input listener {:address=>"0.0.0.0:9900"}
[2017-01-27T15:14:44,108][INFO ][org.apache.kafka.clients.producer.ProducerConfig] ProducerConfig values: 
	metric.reporters = []
	metadata.max.age.ms = 300000
	reconnect.backoff.ms = 10
	sasl.kerberos.ticket.renew.window.factor = 0.8
	bootstrap.servers = [172.31.11.28:9093, 172.31.5.93:9093]
	ssl.keystore.type = JKS
	sasl.mechanism = GSSAPI
	max.block.ms = 60000
	interceptor.classes = null
	ssl.truststore.password = [hidden]
	client.id = 
	ssl.endpoint.identification.algorithm = null
	request.timeout.ms = 30000
	acks = 1
	receive.buffer.bytes = 32768
	ssl.truststore.type = JKS
	retries = 0
	ssl.truststore.location = /etc/logstash/ssl/kafka.client.truststore.jks
	ssl.keystore.password = [hidden]
	send.buffer.bytes = 131072
	compression.type = none
	metadata.fetch.timeout.ms = 60000
	retry.backoff.ms = 100
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	buffer.memory = 33554432
	timeout.ms = 30000
	key.serializer = class org.apache.kafka.common.serialization.StringSerializer
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	ssl.trustmanager.algorithm = PKIX
	block.on.buffer.full = false
	ssl.key.password = [hidden]
	sasl.kerberos.min.time.before.relogin = 60000
	connections.max.idle.ms = 540000
	max.in.flight.requests.per.connection = 5
	metrics.num.samples = 2
	ssl.protocol = TLS
	ssl.provider = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	batch.size = 16384
	ssl.keystore.location = /etc/logstash/ssl/kafka.client.keystore.jks
	ssl.cipher.suites = null
	security.protocol = SSL
	max.request.size = 1048576
	value.serializer = class org.apache.kafka.common.serialization.StringSerializer
	ssl.keymanager.algorithm = SunX509
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	linger.ms = 0

[2017-01-27T15:14:44,156][INFO ][org.apache.kafka.clients.producer.KafkaProducer] Closing the Kafka producer with timeoutMillis = 0 ms.
[2017-01-27T15:14:44,163][ERROR][logstash.outputs.kafka   ] Unable to create Kafka producer from given configuration 
{:kafka_error_message=>org.apache.kafka.common.KafkaException: Failed to construct kafka producer}
[2017-01-27T15:14:44,170][ERROR][logstash.agent           ] Pipeline aborted due to error   {:exception=>org.apache.kafka.common.KafkaException: Failed to construct kafka producer,   :backtrace=>["org.apache.kafka.clients.producer.KafkaProducer.<init>  (org/apache/kafka/clients/producer/KafkaProducer.java:335)",   "org.apache.kafka.clients.producer.KafkaProducer.<init>(org/apache/kafka/clients/producer/KafkaProducer.java:188)",  "java.lang.reflect.Constructor.newInstance(java/lang/reflect/Constructor.java:423)",   "RUBY.create_producer(/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-kafka-5.1.1/lib/logstash/outputs/kafka.rb:235)", "RUBY.register(/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-kafka-5.1.1/lib/logstash/outputs/kafka.rb:171)", "RUBY.register(/usr/share/logstash/logstash-core/lib/logstash/output_delegator_strategies/shared.rb:8)", "RUBY.register(/usr/share/logstash/logstash-core/lib/logstash/output_delegator.rb:37)", "RUBY.start_workers(/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:229)", "org.jruby.RubyArray.each(org/jruby/RubyArray.java:1613)", "RUBY.start_workers(/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:229)", "RUBY.run(/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:183)", "RUBY.start_pipeline(/usr/share/logstash/logstash-core/lib/logstash/agent.rb:292)", "java.lang.Thread.run(java/lang/Thread.java:745)"]}

tried the same settings and same Logstash version on Ubuntu 16.04 and it works.

the only difference(other then the OS): is java versions:
Ubuntu: openjdk version "1.8.0_121" , ruby 2.3.1p112 (2016-04-26) [x86_64-linux-gnu]
Debain: java version "1.8.0_121" (oracle) , ruby 1.9.3p194 (2012-04-20 revision 35410) [x86_64-linux]

what am i missing?
what else can i check?

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.