Logstash Kafka output fails on TLS post-handshake messags

Hello,

I have Logstash and Kafka deployed in OpenShift cluster via operators (ECK for Logstash and AMQ streams for Kafka) Logstash sends messages to Kafka without issue when unencrypted.

To enable mTLS, I have custom keys and certificates for both Kafka and Logstash. These certificates are signed by different CAs. On Logstash side, I have keystore with its tls.key, tls.crt and ca.crt in keystore.p12 (all under "logstash alias") and in truststore.p12 I have the certificate of CA that was used to sign Kafkas' certificate (in order to trust the other side). All is set analogically on Kafka side. And both components are configured to use these PKCS12 stores.

However, when Logstash starts the pipeline, I only get this:

[2024-08-21T15:56:22,645][DEBUG][org.apache.kafka.clients.NetworkClient][listen-http-output-kafka] [Producer clientId=10-130-3-105.my-namespace.pod.cluster.local] Initialize connection to node kafka-poc-kafka-bootstrap.my-namespace.svc:9093 (id: -1 rack: null) for sending metadata request
[2024-08-21T15:56:22,645][DEBUG][org.apache.kafka.clients.ClientUtils][listen-http-output-kafka] Resolved host kafka-poc-kafka-bootstrap.my-namespace.svc as 10.200.89.38
[2024-08-21T15:56:22,645][DEBUG][org.apache.kafka.clients.NetworkClient][listen-http-output-kafka] [Producer clientId=10-130-3-105.my-namespace.pod.cluster.local] Initiating connection to node kafka-poc-kafka-bootstrap.my-namespace.svc:9093 (id: -1 rack: null) using address kafka-poc-kafka-bootstrap.my-namespace.svc/10.200.89.38
[2024-08-21T15:56:22,645][DEBUG][org.apache.kafka.common.network.Selector][listen-http-output-kafka] [Producer clientId=10-130-3-105.my-namespace.pod.cluster.local] Created socket with SO_RCVBUF = 32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1
[2024-08-21T15:56:22,647][DEBUG][org.apache.kafka.clients.NetworkClient][listen-http-output-kafka] [Producer clientId=10-130-3-105.my-namespace.pod.cluster.local] Completed connection to node -1. Fetching API versions.
[2024-08-21T15:56:22,658][DEBUG][org.apache.kafka.common.network.SslTransportLayer][listen-http-output-kafka] [SslTransportLayer channelId=-1 key=channel=java.nio.channels.SocketChannel[connection-pending remote=kafka-poc-kafka-bootstrap.my-namespace.svc/10.200.89.38:9093], selector=sun.nio.ch.EPollSelectorImpl@6cd9412d, interestOps=8, readyOps=0] SSL handshake completed successfully with peerHost 'kafka-poc-kafka-bootstrap.my-namespace.svc' peerPort 9093 peerPrincipal 'CN=kafka-poc-kafka-bootstrap.my-namespace.svc' protocol 'TLSv1.3' cipherSuite 'TLS_AES_256_GCM_SHA384'
[2024-08-21T15:56:22,658][DEBUG][org.apache.kafka.common.network.Selector][listen-http-output-kafka] [Producer clientId=10-130-3-105.my-namespace.pod.cluster.local] Successfully authenticated with kafka-poc-kafka-bootstrap.my-namespace.svc/10.200.89.38
[2024-08-21T15:56:22,658][DEBUG][org.apache.kafka.clients.NetworkClient][listen-http-output-kafka] [Producer clientId=10-130-3-105.my-namespace.pod.cluster.local] Initiating API versions fetch from node -1.
[2024-08-21T15:56:22,658][DEBUG][org.apache.kafka.clients.NetworkClient][listen-http-output-kafka] [Producer clientId=10-130-3-105.my-namespace.pod.cluster.local] Sending API_VERSIONS request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=3, clientId=10-130-3-105.my-namespace.pod.cluster.local, correlationId=41, headerVersion=2) and timeout 40000 to node -1: ApiVersionsRequestData(clientSoftwareName='apache-kafka-java', clientSoftwareVersion='3.4.1')
[2024-08-21T15:56:22,659][INFO ][org.apache.kafka.common.network.Selector][listen-http-output-kafka] [Producer clientId=10-130-3-105.my-namespace.pod.cluster.local] Failed re-authentication with kafka-poc-kafka-bootstrap.my-namespace.svc/10.200.89.38 (channelId=-1) (Failed to process post-handshake messages)
[2024-08-21T15:56:22,659][INFO ][org.apache.kafka.clients.NetworkClient][listen-http-output-kafka] [Producer clientId=10-130-3-105.my-namespace.pod.cluster.local] Node -1 disconnected.
[2024-08-21T15:56:22,659][ERROR][org.apache.kafka.clients.NetworkClient][listen-http-output-kafka] [Producer clientId=10-130-3-105.my-namespace.pod.cluster.local] Connection to node -1 (kafka-poc-kafka-bootstrap.my-namespace.svc/10.200.89.38:9093) failed authentication due to: Failed to process post-handshake messages
[2024-08-21T15:56:22,659][DEBUG][org.apache.kafka.clients.NetworkClient][listen-http-output-kafka] [Producer clientId=10-130-3-105.my-namespace.pod.cluster.local] Cancelled in-flight API_VERSIONS request with correlation id 41 due to node -1 being disconnected (elapsed time since creation: 1ms, elapsed time since send: 1ms, request timeout: 40000ms): ApiVersionsRequestData(clientSoftwareName='apache-kafka-java', clientSoftwareVersion='3.4.1')
[2024-08-21T15:56:22,659][WARN ][org.apache.kafka.clients.NetworkClient][listen-http-output-kafka] [Producer clientId=10-130-3-105.my-namespace.pod.cluster.local] Bootstrap broker kafka-poc-kafka-bootstrap.my-namespace.svc:9093 (id: -1 rack: null) disconnected

As you can see, the initial authentication goes well, but then it fails on re-authentication and disconnects. No idea why. Here are my Kafka YAML and Logstash output configs:

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: kafka-poc
spec:
  entityOperator:
    topicOperator: {}
    userOperator: {}
  kafka:
    authorization:
      type: simple
    config:
      allow.everyone.if.no.acl.found: true
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
      log.message.format.version: '3.4'
      inter.broker.protocol.version: '3.4'
    jvmOptions:
      '-Xms': 4G
      '-Xmx': 4G
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: true
        authentication:
          type: tls
        configuration:
          brokerCertChainAndKey:
            secretName: kafka-poc-kafka-bootstrap-tls
            certificate: tls.crt
            key: tls.key
      - name: route
        port: 9094
        type: route
        tls: true
        authentication:
          type: tls
    logging:
      type: inline
      loggers:
        rootLogger.level: INFO
        log4j.logger.kafka.controller: INFO
        log4j.logger.state.change.logger: INFO
    livenessProbe:
      failureThreshold: 5
      initialDelaySeconds: 60
      periodSeconds: 20
    readinessProbe:
      failureThreshold: 5
      initialDelaySeconds: 60
      periodSeconds: 20
    replicas: 3
    resources:
      limits:
        memory: 8Gi
      requests:
        memory: 4Gi
    storage:
      size: 15Gi
      type: persistent-claim
    tls:
      trustedCertificates:
        - secretName: ls-kafka-producer-cert
          certificate: ca.crt
    version: 3.4.0
  zookeeper:
    jvmOptions:
      '-Xms': 1G
      '-Xmx': 1G
    replicas: 3
    storage:
      size: 2Gi
      type: persistent-claim
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: test-topic
  labels:
    strimzi.io/cluster: kafka-poc
spec:
  config:
    retention.ms: 86400000
    segment.bytes: 1073741824
    max.message.bytes: 10485760
  partitions: 10
  replicas: 3
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
  name: test-user
  labels:
    strimzi.io/cluster: kafka-poc
spec:
  authentication:
    type: tls
  authorization:
    acls:
      - resource:
          type: topic
          name: test-topic-tls
          patternType: literal
        operation: All
        host: '*'
    type: simple
input {
          http {
            port => 8080
            ssl_enabled => true
            ssl_certificate => "/usr/share/logstash/openshift-tls/signed/tls.crt"
            ssl_key => "/usr/share/logstash/openshift-tls/signed/tls.key"
            ssl_certificate_authorities => ["/usr/share/logstash/openshift-tls/ca/service-ca.crt"]
            ssl_client_authentication => "optional"
          }
        }
        output {
          stdout { }
          kafka {
            codec => json
            topic_id => "test-topic"
            message_key => "message"
            bootstrap_servers => "kafka-poc-kafka-bootstrap.my-namespace.svc:9093"
            security_protocol => "SSL"
            ssl_keystore_location => "/usr/share/logstash/config/keystore.p12"
            ssl_keystore_password => "${LS_KEYSTORE_TRUSTSTORE_PASSWD}"
            ssl_keystore_type => "PKCS12"
            ssl_truststore_location => "/usr/share/logstash/config/truststore.p12"
            ssl_truststore_password => "${LS_KEYSTORE_TRUSTSTORE_PASSWD}"
            ssl_truststore_type => "PKCS12"
            client_id => "${IP_POD}.my-namespace.pod.cluster.local"
          }
        }

where IP_POD contains IP of OpenShift pod with dots replaced by hyphens according the documentation (also, the LS certificate contains SAN "*.my-namespace.pod.cluster.local").

I really don't know what is going wrong. Any help is much appretiated. Thank you.