Hi,
I want to produce logs to a Kafka cluster with SASL/Kerberos security. The Kafka cluster is working and tested with KafkaConsoleProducer, but with my Logstash configs it returns the error:
[ERROR][logstash.outputs.kafka ] Unable to create Kafka producer from given configuration {:kafka_error_message=>org.apache.kafka.common.KafkaException: Failed to construct kafka producer}
[ERROR][logstash.agent ] Pipeline aborted due to error {:exception=>org.apache.kafka.common.KafkaException: Failed to construct kafka producer, :backtrace=>["org.apache.kafka.clients.producer.KafkaProducer.(org/apache/kafka/clients/producer/KafkaProducer.java:335)", "org.apache.kafka.clients.producer.KafkaProducer.(org/apache/kafka/clients/producer/KafkaProducer.java:188)", "java.lang.reflect.Constructor.newInstance(java/lang/reflect/Constructor.java:423)", "RUBY.create_producer(/usr/local/src/ELK/logstash-5.5.1/vendor/bundle/jruby/1.9/gems/logstash-output-kafka-5.1.7/lib/logstash/outputs/kafka.rb:242)", "RUBY.register(/usr/local/src/ELK/logstash-5.5.1/vendor/bundle/jruby/1.9/gems/logstash-output-kafka-5.1.7/lib/logstash/outputs/kafka.rb:178)", "RUBY.register(/usr/local/src/ELK/logstash-5.5.1/logstash-core/lib/logstash/output_delegator_strategies/shared.rb:9)", "RUBY.register(/usr/local/src/ELK/logstash-5.5.1/logstash-core/lib/logstash/output_delegator.rb:41)", "RUBY.register_plugin(/usr/local/src/ELK/logstash-5.5.1/logstash-core/lib/logstash/pipeline.rb:281)", "RUBY.register_plugins(/usr/local/src/ELK/logstash-5.5.1/logstash-core/lib/logstash/pipeline.rb:292)", "org.jruby.RubyArray.each(org/jruby/RubyArray.java:1613)", "RUBY.register_plugins(/usr/local/src/ELK/logstash-5.5.1/logstash-core/lib/logstash/pipeline.rb:292)", "RUBY.start_workers(/usr/local/src/ELK/logstash-5.5.1/logstash-core/lib/logstash/pipeline.rb:301)", "RUBY.run(/usr/local/src/ELK/logstash-5.5.1/logstash-core/lib/logstash/pipeline.rb:226)", "RUBY.start_pipeline(/usr/local/src/ELK/logstash-5.5.1/logstash-core/lib/logstash/agent.rb:398)", "java.lang.Thread.run(java/lang/Thread.java:748)"]}
Logstash version: 5.5.1
Plugin version: 5.1.7
Kafka version: 0.10.2.0
Java version: 1.8.0_131
Ruby version: 2.0.0p648 (2015-12-16) [x86_64-linux]
OS: RedHat 7
Logstash output config:
kafka {
topic_id => "acled-test-topic"
bootstrap_servers => "<kafka1_host>:9092,<kafka2_host>:9092"
security_protocol => "SASL_SSL"
ssl_truststore_location => "<path_to>/kafka.client.truststore.jks"
ssl_truststore_password => "MYPASS"
ssl_keystore_location => "<path_to>/kafka.client.keystore.jks"
ssl_keystore_password => "MYPASS"
ssl_key_password => "MYPASS"
kerberos_config => "/etc/krb5.conf"
sasl_kerberos_service_name => "kafka"
jaas_path => "<path_to>/kafka_client_jaas.conf"
sasl_mechanism => "GSSAPI"
client_id => "kafka"
}
kafka_client_jaas.conf:
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
doNotPrompt=true
useTicketCache=true
renewTicket=true
serviceName="kafka"
client=true;
};
krb5.conf:
#Configuration snippets may be placed in this directory as well
includedir /etc/krb5.conf.d/
[logging]
default = FILE:/var/log/krb5libs.log
kdc = FILE:/var/log/krb5kdc.log
admin_server = FILE:/var/log/kadmind.log
[libdefaults]
dns_lookup_realm = false
dns_lookup_kdc = false
ticket_lifetime = 24h
renew_lifetime = 7d
forwardable = true
default_realm = DOMAIN_NAME
[realms]
DOMAIN_NAME = {
kdc = KAFKA1_HOST
admin_server = KAFKA1_HOST
}
Kafka Broker security server.properties:
######## Server Basics #######
#The id of the broker. This must be set to a unique integer for each broker.
broker.id=21listeners=SASL_SSL://:9092
advertised.listeners=SASL_SSL://:9092
port=9092
log.dirs=<kafka-log_path>ssl.key.password=MYPASS
ssl.keystore.location=<path_to>/kafka.server.keystore.jks
ssl.keystore.password=MYPASS
ssl.truststore.location=<path_to>/kafka.server.truststore.jks
ssl.truststore.password=MYPASS
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1allow.everyone.if.no.acl.found=true
principal.to.local.class=kafka.security.auth.KerberosPrincipalToLocalsuper.users=User:kafka
kafka_principal_name=kafka/KAFKA1_HOST@DOMAIN_NAME
kafka_keytab=<path_to>/kafka.service.keytabauthorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
sasl.enabled.mechanisms=GSSAPI
sasl.mechanism.inter.broker.protocol=GSSAPI
security.inter.broker.protocol=SASL_SSLzookeeper.connect=KAFKA1_HOST:2181,KAFKA2_HOST:2181
zookeeper.connection.timeout.ms=25000
zookeeper.session.timeout.ms=30000
zookeeper.sync.time.ms=2000
Someone has already encountered this issue? Am i missing some configuration?
Can anyone help me to resolve this issue?
Thanks,
LC