Hello,
I have Logstash and Kafka deployed in OpenShift cluster via operators (ECK for Logstash and AMQ streams for Kafka) Logstash sends messages to Kafka without issue when unencrypted.
To enable mTLS, I have custom keys and certificates for both Kafka and Logstash. These certificates are signed by different CAs. On Logstash side, I have keystore with its tls.key, tls.crt and ca.crt in keystore.p12 (all under "logstash alias") and in truststore.p12 I have the certificate of CA that was used to sign Kafkas' certificate (in order to trust the other side). All is set analogically on Kafka side. And both components are configured to use these PKCS12 stores.
However, when Logstash starts the pipeline, I only get this:
[2024-08-21T15:56:22,645][DEBUG][org.apache.kafka.clients.NetworkClient][listen-http-output-kafka] [Producer clientId=10-130-3-105.my-namespace.pod.cluster.local] Initialize connection to node kafka-poc-kafka-bootstrap.my-namespace.svc:9093 (id: -1 rack: null) for sending metadata request
[2024-08-21T15:56:22,645][DEBUG][org.apache.kafka.clients.ClientUtils][listen-http-output-kafka] Resolved host kafka-poc-kafka-bootstrap.my-namespace.svc as 10.200.89.38
[2024-08-21T15:56:22,645][DEBUG][org.apache.kafka.clients.NetworkClient][listen-http-output-kafka] [Producer clientId=10-130-3-105.my-namespace.pod.cluster.local] Initiating connection to node kafka-poc-kafka-bootstrap.my-namespace.svc:9093 (id: -1 rack: null) using address kafka-poc-kafka-bootstrap.my-namespace.svc/10.200.89.38
[2024-08-21T15:56:22,645][DEBUG][org.apache.kafka.common.network.Selector][listen-http-output-kafka] [Producer clientId=10-130-3-105.my-namespace.pod.cluster.local] Created socket with SO_RCVBUF = 32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1
[2024-08-21T15:56:22,647][DEBUG][org.apache.kafka.clients.NetworkClient][listen-http-output-kafka] [Producer clientId=10-130-3-105.my-namespace.pod.cluster.local] Completed connection to node -1. Fetching API versions.
[2024-08-21T15:56:22,658][DEBUG][org.apache.kafka.common.network.SslTransportLayer][listen-http-output-kafka] [SslTransportLayer channelId=-1 key=channel=java.nio.channels.SocketChannel[connection-pending remote=kafka-poc-kafka-bootstrap.my-namespace.svc/10.200.89.38:9093], selector=sun.nio.ch.EPollSelectorImpl@6cd9412d, interestOps=8, readyOps=0] SSL handshake completed successfully with peerHost 'kafka-poc-kafka-bootstrap.my-namespace.svc' peerPort 9093 peerPrincipal 'CN=kafka-poc-kafka-bootstrap.my-namespace.svc' protocol 'TLSv1.3' cipherSuite 'TLS_AES_256_GCM_SHA384'
[2024-08-21T15:56:22,658][DEBUG][org.apache.kafka.common.network.Selector][listen-http-output-kafka] [Producer clientId=10-130-3-105.my-namespace.pod.cluster.local] Successfully authenticated with kafka-poc-kafka-bootstrap.my-namespace.svc/10.200.89.38
[2024-08-21T15:56:22,658][DEBUG][org.apache.kafka.clients.NetworkClient][listen-http-output-kafka] [Producer clientId=10-130-3-105.my-namespace.pod.cluster.local] Initiating API versions fetch from node -1.
[2024-08-21T15:56:22,658][DEBUG][org.apache.kafka.clients.NetworkClient][listen-http-output-kafka] [Producer clientId=10-130-3-105.my-namespace.pod.cluster.local] Sending API_VERSIONS request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=3, clientId=10-130-3-105.my-namespace.pod.cluster.local, correlationId=41, headerVersion=2) and timeout 40000 to node -1: ApiVersionsRequestData(clientSoftwareName='apache-kafka-java', clientSoftwareVersion='3.4.1')
[2024-08-21T15:56:22,659][INFO ][org.apache.kafka.common.network.Selector][listen-http-output-kafka] [Producer clientId=10-130-3-105.my-namespace.pod.cluster.local] Failed re-authentication with kafka-poc-kafka-bootstrap.my-namespace.svc/10.200.89.38 (channelId=-1) (Failed to process post-handshake messages)
[2024-08-21T15:56:22,659][INFO ][org.apache.kafka.clients.NetworkClient][listen-http-output-kafka] [Producer clientId=10-130-3-105.my-namespace.pod.cluster.local] Node -1 disconnected.
[2024-08-21T15:56:22,659][ERROR][org.apache.kafka.clients.NetworkClient][listen-http-output-kafka] [Producer clientId=10-130-3-105.my-namespace.pod.cluster.local] Connection to node -1 (kafka-poc-kafka-bootstrap.my-namespace.svc/10.200.89.38:9093) failed authentication due to: Failed to process post-handshake messages
[2024-08-21T15:56:22,659][DEBUG][org.apache.kafka.clients.NetworkClient][listen-http-output-kafka] [Producer clientId=10-130-3-105.my-namespace.pod.cluster.local] Cancelled in-flight API_VERSIONS request with correlation id 41 due to node -1 being disconnected (elapsed time since creation: 1ms, elapsed time since send: 1ms, request timeout: 40000ms): ApiVersionsRequestData(clientSoftwareName='apache-kafka-java', clientSoftwareVersion='3.4.1')
[2024-08-21T15:56:22,659][WARN ][org.apache.kafka.clients.NetworkClient][listen-http-output-kafka] [Producer clientId=10-130-3-105.my-namespace.pod.cluster.local] Bootstrap broker kafka-poc-kafka-bootstrap.my-namespace.svc:9093 (id: -1 rack: null) disconnected
As you can see, the initial authentication goes well, but then it fails on re-authentication and disconnects. No idea why. Here are my Kafka YAML and Logstash output configs:
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: kafka-poc
spec:
entityOperator:
topicOperator: {}
userOperator: {}
kafka:
authorization:
type: simple
config:
allow.everyone.if.no.acl.found: true
offsets.topic.replication.factor: 3
transaction.state.log.replication.factor: 3
transaction.state.log.min.isr: 2
log.message.format.version: '3.4'
inter.broker.protocol.version: '3.4'
jvmOptions:
'-Xms': 4G
'-Xmx': 4G
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
authentication:
type: tls
configuration:
brokerCertChainAndKey:
secretName: kafka-poc-kafka-bootstrap-tls
certificate: tls.crt
key: tls.key
- name: route
port: 9094
type: route
tls: true
authentication:
type: tls
logging:
type: inline
loggers:
rootLogger.level: INFO
log4j.logger.kafka.controller: INFO
log4j.logger.state.change.logger: INFO
livenessProbe:
failureThreshold: 5
initialDelaySeconds: 60
periodSeconds: 20
readinessProbe:
failureThreshold: 5
initialDelaySeconds: 60
periodSeconds: 20
replicas: 3
resources:
limits:
memory: 8Gi
requests:
memory: 4Gi
storage:
size: 15Gi
type: persistent-claim
tls:
trustedCertificates:
- secretName: ls-kafka-producer-cert
certificate: ca.crt
version: 3.4.0
zookeeper:
jvmOptions:
'-Xms': 1G
'-Xmx': 1G
replicas: 3
storage:
size: 2Gi
type: persistent-claim
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: test-topic
labels:
strimzi.io/cluster: kafka-poc
spec:
config:
retention.ms: 86400000
segment.bytes: 1073741824
max.message.bytes: 10485760
partitions: 10
replicas: 3
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
name: test-user
labels:
strimzi.io/cluster: kafka-poc
spec:
authentication:
type: tls
authorization:
acls:
- resource:
type: topic
name: test-topic-tls
patternType: literal
operation: All
host: '*'
type: simple
input {
http {
port => 8080
ssl_enabled => true
ssl_certificate => "/usr/share/logstash/openshift-tls/signed/tls.crt"
ssl_key => "/usr/share/logstash/openshift-tls/signed/tls.key"
ssl_certificate_authorities => ["/usr/share/logstash/openshift-tls/ca/service-ca.crt"]
ssl_client_authentication => "optional"
}
}
output {
stdout { }
kafka {
codec => json
topic_id => "test-topic"
message_key => "message"
bootstrap_servers => "kafka-poc-kafka-bootstrap.my-namespace.svc:9093"
security_protocol => "SSL"
ssl_keystore_location => "/usr/share/logstash/config/keystore.p12"
ssl_keystore_password => "${LS_KEYSTORE_TRUSTSTORE_PASSWD}"
ssl_keystore_type => "PKCS12"
ssl_truststore_location => "/usr/share/logstash/config/truststore.p12"
ssl_truststore_password => "${LS_KEYSTORE_TRUSTSTORE_PASSWD}"
ssl_truststore_type => "PKCS12"
client_id => "${IP_POD}.my-namespace.pod.cluster.local"
}
}
where IP_POD contains IP of OpenShift pod with dots replaced by hyphens according the documentation (also, the LS certificate contains SAN "*.my-namespace.pod.cluster.local").
I really don't know what is going wrong. Any help is much appretiated. Thank you.