Logshtash 5.5.1 Kafka output SASL_SSL Error

Hi,

I want to produce logs to a Kafka cluster with SASL/Kerberos security. The Kafka cluster is working and tested with KafkaConsoleProducer, but with my Logstash configs it returns the error:

[ERROR][logstash.outputs.kafka ] Unable to create Kafka producer from given configuration {:kafka_error_message=>org.apache.kafka.common.KafkaException: Failed to construct kafka producer}

[ERROR][logstash.agent ] Pipeline aborted due to error {:exception=>org.apache.kafka.common.KafkaException: Failed to construct kafka producer, :backtrace=>["org.apache.kafka.clients.producer.KafkaProducer.(org/apache/kafka/clients/producer/KafkaProducer.java:335)", "org.apache.kafka.clients.producer.KafkaProducer.(org/apache/kafka/clients/producer/KafkaProducer.java:188)", "java.lang.reflect.Constructor.newInstance(java/lang/reflect/Constructor.java:423)", "RUBY.create_producer(/usr/local/src/ELK/logstash-5.5.1/vendor/bundle/jruby/1.9/gems/logstash-output-kafka-5.1.7/lib/logstash/outputs/kafka.rb:242)", "RUBY.register(/usr/local/src/ELK/logstash-5.5.1/vendor/bundle/jruby/1.9/gems/logstash-output-kafka-5.1.7/lib/logstash/outputs/kafka.rb:178)", "RUBY.register(/usr/local/src/ELK/logstash-5.5.1/logstash-core/lib/logstash/output_delegator_strategies/shared.rb:9)", "RUBY.register(/usr/local/src/ELK/logstash-5.5.1/logstash-core/lib/logstash/output_delegator.rb:41)", "RUBY.register_plugin(/usr/local/src/ELK/logstash-5.5.1/logstash-core/lib/logstash/pipeline.rb:281)", "RUBY.register_plugins(/usr/local/src/ELK/logstash-5.5.1/logstash-core/lib/logstash/pipeline.rb:292)", "org.jruby.RubyArray.each(org/jruby/RubyArray.java:1613)", "RUBY.register_plugins(/usr/local/src/ELK/logstash-5.5.1/logstash-core/lib/logstash/pipeline.rb:292)", "RUBY.start_workers(/usr/local/src/ELK/logstash-5.5.1/logstash-core/lib/logstash/pipeline.rb:301)", "RUBY.run(/usr/local/src/ELK/logstash-5.5.1/logstash-core/lib/logstash/pipeline.rb:226)", "RUBY.start_pipeline(/usr/local/src/ELK/logstash-5.5.1/logstash-core/lib/logstash/agent.rb:398)", "java.lang.Thread.run(java/lang/Thread.java:748)"]}

Logstash version: 5.5.1
Plugin version: 5.1.7
Kafka version: 0.10.2.0
Java version: 1.8.0_131
Ruby version: 2.0.0p648 (2015-12-16) [x86_64-linux]
OS: RedHat 7

Logstash output config:

kafka {
  topic_id => "acled-test-topic"
  bootstrap_servers => "<kafka1_host>:9092,<kafka2_host>:9092"
  security_protocol => "SASL_SSL"
  ssl_truststore_location => "<path_to>/kafka.client.truststore.jks"
  ssl_truststore_password => "MYPASS"
  ssl_keystore_location => "<path_to>/kafka.client.keystore.jks"
  ssl_keystore_password => "MYPASS"
  ssl_key_password => "MYPASS"
  kerberos_config => "/etc/krb5.conf"
  sasl_kerberos_service_name => "kafka"
  jaas_path => "<path_to>/kafka_client_jaas.conf"
  sasl_mechanism => "GSSAPI"
  client_id => "kafka"
}

kafka_client_jaas.conf:

KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
doNotPrompt=true
useTicketCache=true
renewTicket=true
serviceName="kafka"
client=true;
};

krb5.conf:

#Configuration snippets may be placed in this directory as well
includedir /etc/krb5.conf.d/
[logging]
default = FILE:/var/log/krb5libs.log
kdc = FILE:/var/log/krb5kdc.log
admin_server = FILE:/var/log/kadmind.log
[libdefaults]
dns_lookup_realm = false
dns_lookup_kdc = false
ticket_lifetime = 24h
renew_lifetime = 7d
forwardable = true
default_realm = DOMAIN_NAME
[realms]
DOMAIN_NAME = {
kdc = KAFKA1_HOST
admin_server = KAFKA1_HOST
}

Kafka Broker security server.properties:

######## Server Basics #######
#The id of the broker. This must be set to a unique integer for each broker.
broker.id=21

listeners=SASL_SSL://:9092
advertised.listeners=SASL_SSL://:9092
port=9092
log.dirs=<kafka-log_path>

ssl.key.password=MYPASS
ssl.keystore.location=<path_to>/kafka.server.keystore.jks
ssl.keystore.password=MYPASS
ssl.truststore.location=<path_to>/kafka.server.truststore.jks
ssl.truststore.password=MYPASS
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1

allow.everyone.if.no.acl.found=true
principal.to.local.class=kafka.security.auth.KerberosPrincipalToLocal

super.users=User:kafka
kafka_principal_name=kafka/KAFKA1_HOST@DOMAIN_NAME
kafka_keytab=<path_to>/kafka.service.keytab

authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
sasl.enabled.mechanisms=GSSAPI
sasl.mechanism.inter.broker.protocol=GSSAPI
security.inter.broker.protocol=SASL_SSL

zookeeper.connect=KAFKA1_HOST:2181,KAFKA2_HOST:2181
zookeeper.connection.timeout.ms=25000
zookeeper.session.timeout.ms=30000
zookeeper.sync.time.ms=2000

Someone has already encountered this issue? Am i missing some configuration?
Can anyone help me to resolve this issue?

Thanks,
LC

I am also having the same error. please let me know if you got any workaround.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.