Issue's in configuring kafka input plugin with TLS

Description of the problem:

I'm having trouble integrating Kafka with Logstash, and Kafka is configured with TLS.
I am getting the following exceptions when trying to provide PKCS12 format TLS certificates in the Kafka input plugin.

  1. Logstash version - 8.8.1
  2. Logstash installation source - Bitnami Logstash helm chart
  3. How is Logstash being run - Kubernetes stateful-set
  4. Kafka Input plugin - Kafka input plugin | Logstash Reference [8.8] | Elastic

Error:

``
Unable to create Kafka consumer from given configuration {:kafka_error_message=>#<Java::OrgApacheKafkaCommon::KafkaException: Failed to construct kafka consumer>, :cause=>#<Java::OrgApacheKafkaCommon::KafkaException: Failed to load SSL keystore /usr/share/logstash/config/certs/ample-user/p12 of type PKCS12>}
[2023-07-05T16:16:33,533][ERROR][logstash.javapipeline ][main][0a81a40d970eb2a3701645643b7b64f3e3bed2443d45da944a470013bf8ce0ef] A plugin had an unrecoverable error. Will restart this plugin.


Helm values:

extraEnvVars:
  - name: KAFKA_HOST
    value: "kafka-kafka-bootstrap"
  - name: KAFKA_PORT
    value: "9093"

extraVolumeMounts:
  - name: kafka-cluster-certs
    mountPath: /usr/share/logstash/config/certs/kafka-cluster
    readOnly: true
  - name: ample-user
    mountPath: /usr/share/logstash/config/certs/ample-user
    readOnly: true

extraVolumes: 
  - name: kafka-cluster-certs
    secret:
      secretName: kafka-cluster-ca-cert
      items:
      - key: ca.p12
        path: p12
      - key: ca.password
        path: password
  - name: ample-user
    secret:
      secretName: ample-user
      items:
      - key: user.p12
        path: p12
      - key: user.password
        path: password

input: |-
  kafka {
    bootstrap_servers => ["${KAFKA_HOST}:${KAFKA_PORT}"]
    topics => ["fluent-log-vault"]
    type => "vault"
    security_protocol => "SSL"
    ssl_truststore_type => "PKCS12"
    ssl_keystore_type => "PKCS12"
    ssl_truststore_location => "/usr/share/logstash/config/certs/kafka-cluster/p12"
    ssl_truststore_password => "/usr/share/logstash/config/certs/password"
    ssl_keystore_location => "/usr/share/logstash/config/certs/ample-user/p12"
    ssl_keystore_password => "/usr/share/logstash/config/certs/ample-user/password"
  }

Detailed Logs:

[2023-07-05T16:16:33,529][INFO ][org.apache.kafka.common.metrics.Metrics][main][0a81a40d970eb2a3701645643b7b64f3e3bed2443d45da944a470013bf8ce0ef] Metrics scheduler closed
[2023-07-05T16:16:33,530][INFO ][org.apache.kafka.common.metrics.Metrics][main][0a81a40d970eb2a3701645643b7b64f3e3bed2443d45da944a470013bf8ce0ef] Closing reporter org.apache.kafka.common.metrics.JmxReporter
[2023-07-05T16:16:33,530][INFO ][org.apache.kafka.common.metrics.Metrics][main][0a81a40d970eb2a3701645643b7b64f3e3bed2443d45da944a470013bf8ce0ef] Metrics reporters closed
[2023-07-05T16:16:33,530][INFO ][org.apache.kafka.common.utils.AppInfoParser][main][0a81a40d970eb2a3701645643b7b64f3e3bed2443d45da944a470013bf8ce0ef] App info kafka.consumer for logstash-0 unregistered
[2023-07-05T16:16:33,531][ERROR][logstash.inputs.kafka ][main][0a81a40d970eb2a3701645643b7b64f3e3bed2443d45da944a470013bf8ce0ef] Unable to create Kafka consumer from given configuration {:kafka_error_message=>#<Java::OrgApacheKafkaCommon::KafkaException: Failed to construct kafka consumer>, :cause=>#<Java::OrgApacheKafkaCommon::KafkaException: Failed to load SSL keystore /usr/share/logstash/config/certs/ample-user/p12 of type PKCS12>}
[2023-07-05T16:16:33,533][ERROR][logstash.javapipeline ][main][0a81a40d970eb2a3701645643b7b64f3e3bed2443d45da944a470013bf8ce0ef] A plugin had an unrecoverable error. Will restart this plugin.
Pipeline_id:main
Plugin: <LogStash::Inputs::Kafka ssl_keystore_location=>"/usr/share/logstash/config/certs/ample-user/p12", ssl_keystore_password=>, topics=>["fluent-log-vault"], ssl_truststore_location=>"/usr/share/logstash/config/certs/kafka-cluster/p12", ssl_truststore_password=>, security_protocol=>"SSL", id=>"0a81a40d970eb2a3701645643b7b64f3e3bed2443d45da944a470013bf8ce0ef", type=>"vault", ssl_truststore_type=>"PKCS12", bootstrap_servers=>"kafka-kafka-bootstrap:9093", ssl_keystore_type=>"PKCS12", codec=><LogStash::Codecs::Plain id=>"plain_c97407f8-f507-4c16-a8a6-8cc822b8e0eb", enable_metric=>true, charset=>"UTF-8">, enable_metric=>true, connections_max_idle_ms=>540000, metadata_max_age_ms=>300000, request_timeout_ms=>40000, schema_registry_ssl_keystore_type=>"jks", schema_registry_ssl_truststore_type=>"jks", schema_registry_validation=>"auto", auto_commit_interval_ms=>5000, check_crcs=>true, client_dns_lookup=>"use_all_dns_ips", client_id=>"logstash", consumer_threads=>1, enable_auto_commit=>true, fetch_max_bytes=>52428800, fetch_max_wait_ms=>500, group_id=>"logstash", heartbeat_interval_ms=>3000, isolation_level=>"read_uncommitted", key_deserializer_class=>"org.apache.kafka.common.serialization.StringDeserializer", max_poll_interval_ms=>300000, max_partition_fetch_bytes=>1048576, max_poll_records=>500, receive_buffer_bytes=>32768, reconnect_backoff_ms=>50, retry_backoff_ms=>100, send_buffer_bytes=>131072, session_timeout_ms=>10000, value_deserializer_class=>"org.apache.kafka.common.serialization.StringDeserializer", poll_timeout_ms=>100, ssl_endpoint_identification_algorithm=>"https", sasl_mechanism=>"GSSAPI", decorate_events=>"none">
Error: Failed to construct kafka consumer
Exception: Java::OrgApacheKafkaCommon::KafkaException
Stack: org.apache.kafka.clients.consumer.KafkaConsumer.(org/apache/kafka/clients/consumer/KafkaConsumer.java:830)
org.apache.kafka.clients.consumer.KafkaConsumer.(org/apache/kafka/clients/consumer/KafkaConsumer.java:666)
org.apache.kafka.clients.consumer.KafkaConsumer.(org/apache/kafka/clients/consumer/KafkaConsumer.java:647)
org.apache.kafka.clients.consumer.KafkaConsumer.(org/apache/kafka/clients/consumer/KafkaConsumer.java:627)
jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(jdk/internal/reflect/NativeConstructorAccessorImpl.java:77)
jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(jdk/internal/reflect/DelegatingConstructorAccessorImpl.java:45)
java.lang.reflect.Constructor.newInstanceWithCaller(java/lang/reflect/Constructor.java:499)
java.lang.reflect.Constructor.newInstance(java/lang/reflect/Constructor.java:480)
org.jruby.javasupport.JavaConstructor.newInstanceDirect(org/jruby/javasupport/JavaConstructor.java:269)
org.jruby.RubyClass.new(org/jruby/RubyClass.java:890)
org.jruby.RubyClass$INVOKER$i$newInstance.call(org/jruby/RubyClass$INVOKER$i$newInstance.gen)
RUBY.create_consumer(/opt/bitnami/logstash/vendor/bundle/jruby/2.6.0/gems/logstash-integration-kafka-11.2.1-java/lib/logstash/inputs/kafka.rb:477)
RUBY.run(/opt/bitnami/logstash/vendor/bundle/jruby/2.6.0/gems/logstash-integration-kafka-11.2.1-java/lib/logstash/inputs/kafka.rb:297)
org.jruby.RubyEnumerable$18.call(org/jruby/RubyEnumerable.java:815)
org.jruby.RubyEnumerator$1.call(org/jruby/RubyEnumerator.java:400)
org.jruby.RubyFixnum.times(org/jruby/RubyFixnum.java:308)
org.jruby.RubyInteger$INVOKER$i$0$0$times.call(org/jruby/RubyInteger$INVOKER$i$0$0$times.gen)
org.jruby.RubyClass.finvokeWithRefinements(org/jruby/RubyClass.java:514)
org.jruby.RubyClass.finvoke(org/jruby/RubyClass.java:502)
org.jruby.RubyBasicObject.callMethod(org/jruby/RubyBasicObject.java:387)
org.jruby.RubyEnumerator.each(org/jruby/RubyEnumerator.java:396)
org.jruby.RubyEnumerator.each(org/jruby/RubyEnumerator.java:392)
org.jruby.RubyEnumerator$INVOKER$i$each.call(org/jruby/RubyEnumerator$INVOKER$i$each.gen)
org.jruby.RubyEnumerable.collectCommon(org/jruby/RubyEnumerable.java:807)
org.jruby.RubyEnumerable.map(org/jruby/RubyEnumerable.java:799)
org.jruby.RubyEnumerable$INVOKER$s$0$0$map.call(org/jruby/RubyEnumerable$INVOKER$s$0$0$map.gen)
RUBY.run(/opt/bitnami/logstash/vendor/bundle/jruby/2.6.0/gems/logstash-integration-kafka-11.2.1-java/lib/logstash/inputs/kafka.rb:295)
RUBY.inputworker(/opt/bitnami/logstash/logstash-core/lib/logstash/java_pipeline.rb:414)
RUBY.start_input(/opt/bitnami/logstash/logstash-core/lib/logstash/java_pipeline.rb:405)
org.jruby.RubyProc.call(org/jruby/RubyProc.java:309)
java.lang.Thread.run(java/lang/Thread.java:833)


Is there any way to solve this problem?

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.