Error Return code 21: SSL encryption between filebeat 5.2 and kafka 1.0 (self signed)

I am trying to set up ssl connection between filebeat 5.2 and kafka 1.0 using the below steps. I am fairly new to encryption world and seeing errors during this process. Not sure if I am missing something in cert/key or filebeat and kafka config. Would appreciate some guidance.

Filebeat: v5.2 residing on Debian Jessie 8
Kafka: v1.0 residing on Debian Wheezy 7

Sequence followed

Login to common node

cat > cert_info_filebeat << EOF
[req]
default_bits = 2048
prompt = no
default_md = sha512
req_extensions = req_ext
distinguished_name = dn

[ dn ]
C=US
ST=<xxx>
L=<xxx>
O=<xxx>
OU=<xxx>
emailAddress=<xxx@xx.com>
CN = filebeat

[ req_ext ]
keyUsage = keyEncipherment, dataEncipherment
extendedKeyUsage = serverAuth, clientAuth
subjectAltName = @alt_names

[ alt_names ]
IP.1 = <filebeat IP 1>
IP.2 = <filebeat IP 2>
IP.3 = <filebeat IP 3> 

[ usr_cert ]
# Extensions for server certificates.
basicConstraints = CA:FALSE
nsCertType = client, server
nsComment = "OpenSSL FileBeat Server / Client Certificate"
subjectKeyIdentifier = hash
authorityKeyIdentifier = keyid,issuer:always
keyUsage = critical, digitalSignature, keyEncipherment, keyAgreement, nonRepudiation
extendedKeyUsage = serverAuth, clientAuth

EOF

Run these commands
Generates key
openssl genrsa -out filebeat.key 2048
Generates certificate signing request
openssl req -new -out filebeat.csr -key filebeat.key -config <(cat cert_info_filebeat)
Generate self signed certificate
openssl x509 -req -days 3650 -in filebeat.csr -signkey filebeat.key -out filebeat.crt -extensions req_ext -extfile cert_info_filebeat

After this I have filebeat.key, filebeat.csr, filebeat.crt files. Ran this command to verify crt and no errors there

openssl x509 -infilebeat.crt -text -noout

Modify filebeat config

 output.kafka:
  hosts: ["<kafka IP>:9093"]
  topic: '%{[type]}'
  # ssl crt/key files
  ssl.certificate: "/etc/filebeat_softlayer.crt"
  ssl.key: "/etc/filebeat_softlayer.key"
  # ssl.verification_mode: none ( turn off ssl domain/hostname verifications )
  compression: gzip

Kafka Server

cat > cert_info_kafka << EOF
[req]
default_bits = 2048
prompt = no
default_md = sha512
req_extensions = req_ext
distinguished_name = dn

[ dn ]
C=US
ST=<xxx>
L=<xxx>
O=<xxx>
OU=<xxx>
emailAddress=<xxx>
CN = kafka

[ req_ext ]
keyUsage = keyEncipherment, dataEncipherment
extendedKeyUsage = serverAuth, clientAuth
subjectAltName = @alt_names

[ alt_names ]
IP.1 = <kafka IP>

[ usr_cert ]
# Extensions for server certificates.
basicConstraints = CA:FALSE
nsCertType = client, server
nsComment = "OpenSSL Kafka Server / Client Certificate"
subjectKeyIdentifier = hash
authorityKeyIdentifier = keyid,issuer:always
keyUsage = critical, digitalSignature, keyEncipherment, keyAgreement, nonRepudiation
extendedKeyUsage = serverAuth, clientAuth

EOF

Generate key

openssl genrsa -out kafka.key 2048

Generate csr
openssl req -new -out kafka.csr -key kafka.key -config <(cat cert_info_kafka)
openssl req -text -noout -in kafka.csr
Generate cert
openssl x509 -req -days 3650 -in kafka.csr -signkey kafka.key -out kafka.crt -extensions req_ext -extfile cert_info_kafka

Generate pkcs12 file so we can import the crt and key to generate kafka keystore file
openssl pkcs12 -export -in kafka.crt -inkey kafka.key -out kafka.p12 -name kafka-pk12

cd /opt/obuildfactory/jdk-1.8.0-openjdk-x86_64/jre/bin/
./keytool -importkeystore -deststorepass <password> -destkeystore kafka-keystore.jks -srckeystore kafka_softlayer.p12 -srcstoretype PKCS12

Modify kafka config
# Listener List - Comma-separated list of URIs we will listen on and the listener names.
# If the listener name is not a security protocol, listener.security.protocol.map must also be set.
# Specify hostname as 0.0.0.0 to bind to all interfaces. Leave hostname empty to bind to default interface.
# Examples of legal listener lists: PLAINTEXT://myhost:9092,SSL://:9091 CLIENT://0.0.0.0:9092,REPLICATION://localhost:9093
#listeners=PLAINTEXT://172.16.0.110:9092
listeners=ONSL://:9092,INSL://:9093

# advertised listener part
advertised.listeners=ONSL://<kafka IP>:9092,INSL://<kafka IP>:9093
inter.broker.listener.name=INSL
listener.security.protocol.map=INSL:SSL,ONSL:PLAINTEXT

# ssl encryption config from docs
ssl.keystore.location=/opt/obuildfactory/jdk-1.8.0-openjdk-x86_64/jre/bin/kafka-keystore.jks
ssl.keystore.password=<password>

Issues:

When I start kafka process on kafka node the logs says but process and ports are still up and running
[2018-02-08 15:09:16,060] ERROR [Controller id=1, targetBrokerId=1] Connection to node 1 failed authentication due to: SSL handshake failed (org.apache.kafka.clients.NetworkClient)

Filebeat error

2018-02-07T20:35:04Z WARN client/metadata fetching metadata for all topics from broker <kafka IP>:9093

2018-02-07T20:35:04Z WARN Failed to connect to broker <kafka IP>:9093: x509: certificate signed by unknown authority

2018-02-07T20:35:04Z WARN kafka message: client/metadata got error from broker while fetching metadata:%!(EXTRA x509.UnknownAuthorityError=x509: certificate signed by unknown authority)
2018-02-07T20:35:04Z WARN kafka message: client/metadata no available broker to send metadata request to
2018-02-07T20:35:04Z WARN client/brokers resurrecting 1 dead seed brokers
2018-02-07T20:35:04Z WARN client/metadata retrying after 250ms... (3 attempts remaining)

2018-02-07T20:35:05Z WARN client/metadata fetching metadata for all topics from broker <kafka IP>:9093

When I run this command from filebeat node

openssl s_client -connect :9093 -showcerts

It shows this message

verify error:num=21:unable to verify the first certificate
verify return:1

Please advise on what I am missing and what are the steps to enable ssl between filebeat and kafka in local environment.

Thanks,
Gangadhar

If you are not disabling ssl.verification in filebeat, then you need to give it the server certificate. I run TLS with client authentication between filebeat and kafka (not using self-signed, I use our corporate CA). The filebeat side looks like this

  # List of root certificates for HTTPS server verifications
    ssl.certificate_authorities: ["/etc/somewhere/CorpRoot.pem"]

  # Certificate for SSL client authentication
    ssl.certificate: "/etc/somewhere/NameMatchedServerCert.pem"

  # Client Certificate Key
    ssl.key: "/etc/somewhere/NameMatchedServerCert.key"
    ssl.key_passphrase: "ThatWasChanged"

The kafka side looks like this

ssl.keystore.location=/etc/somewhere/KeyStore.jks
ssl.keystore.password=IChangedIt
ssl.key.password=ThatWasChanged
ssl.truststore.location=/etc/somewhere/TrustStore.jks
ssl.truststore.password=IChangedIt
ssl.client.auth=required

If you have ssl.client.auth set to none then you don't need a truststore for kafka and just need the certificate_authorities for filebeat.

But what about self signed certificates. Do you think I have the right files in my configuration. I don't have a .pem file on either kafka/filebeat nodes I have .csr, .crt and .key files instead

Self-signed are signed by you. That is, you generated a CA cert at some point. Unless you have configured that in your system's default truststore (which to me would be crazy) you have tell filebeat about it. The documentation here shows a pem file and my guess is it has to be pem. Do the conversion using openssl.

On the kafka side, assuming you are not doing client auth, you need a jks and the keystore and certificate passwords.

ssl.keystore.location=/etc/somewhere/KeyStore.jks
ssl.keystore.password=IChangedIt
ssl.key.password=ThisWasChangedToo

Thanks Badger. Tried converting the crt to pem file and made config changes. Unfortunately, no luck. If I had to do this in a local environment where filebeat and kafka reside on different clusters what sequence of steps would you suggest? May be I am not doing it the right way? Any pointers is greatly appreciated. I have listed the sequence of steps I tried in my earlier post

Your original post shows ssl.verification_mode is commented out. So the kafka cert name has to match
what you have in advertised_listeners. Which is ONSL://:9092,INSL://:9093. If you really have IPs there the cert name has to be the IP.

First see if kafka is accepting requests using openssl. If kafka is broken then you have a kafka problem, not an Elastic problem, so you might want to seek help elsewhere. If that end looks OK disable verification_mode in filebeat. If that fixes things then generate a new cert (using the same CA) that matches the entry in advertised listeners (I would recommend using a hostname rather than an IP). If that works in filebeat enable verification_mode and you have a working system.

Do not run with verification_mode none enabled except for testing.

Running kafka with self-signed certificates can be quite a pain. As you will need one certificate per kafka node + need to configure beats with each single nodes certificate. If possible, try to run kafka with CAs.

While one can use IPs with certificates, you should consider having domain names. I'm not even sure multiple IPs via alt_names is supported by the go tls libs.

This gist creates a trustchain and certificates for localhost (openssl configs are only slightly modified). It creates jks files for use with kafka and pem files for use with beats. For conversion between formats, I find it easier to create all certificates with with openssl and use openssl pkcs12 and keytool -importkeystore in order to create the java keystore for use with kafka.

This conversion is done for localhost here: https://gist.github.com/urso/cedfdab25b84c8ee3389ff0727d220ad#file-makefile-L38

And for the CA here:

Working with openssl, keytool and creating trustchains can be quite painful and time consuming.

Do changes and verify step by step. @Badger has given you very great tips how to progress.

Thanks Badger, Sorry for naive questions but I am newbie to encryption. I followed this link https://docs.confluent.io/current/kafka/encryption.html#broker-keystore (similar to kafka docs) to generate certs/key/jks files for kafka, modified the server configs and kafka process seems to be running fine. I am able to read messages from different topic(unrelated to filebeat in question). The only change I made from doc is wherever it was

-alias localhost

I used -alias <kafka IP>

As mentioned earlier my filebeat resides on different cluster. Do I need to move any files from kafka to generates cert/keys in filebeat. More specifically how do I generate the below bolded block in filebeat node to update output.kafka

output.logstash:
  hosts: ["logs.mycompany.com:5044"]
  **ssl.certificate_authorities: ["/etc/ca.crt"]**
  **ssl.certificate: "/etc/client.crt"**
  **ssl.key: "/etc/client.key"**

At this point I am trying to establish one-to-one ssl connection between filebeat and kafka. I also need to scale for many-to-many nodes in cluster preferably using self signed certificates. Am not looking to have ssl for inter-broker-kafka-communication

Kafka server.config
# Listener List - Comma-separated list of URIs we will listen on and the listener names.
# If the listener name is not a security protocol, listener.security.protocol.map must also be set.
# Specify hostname as 0.0.0.0 to bind to all interfaces. Leave hostname empty to bind to default interface.
# Examples of legal listener lists: PLAINTEXT://myhost:9092,SSL://:9091 CLIENT://0.0.0.0:9092,REPLICATION://localhost:9093
#listeners=PLAINTEXT://<kafka IP:9092
listeners=ONSL://<kafka IP>:9092,INSL://<kafka IP:9093


# advertised listener part
advertised.listeners=ONSL://<kafka IP>:9092,INSL://<kafka IP>:9093
inter.broker.listener.name=INSL
listener.security.protocol.map=INSL:SSL,ONSL:PLAINTEXT


# ssl encryption config from docs
ssl.keystore.location=/opt/obuildfactory/jdk-1.8.0-openjdk-x86_64/jre/bin/mlb_ssl/kafka.server.keystore.jks
ssl.keystore.password=<password>
ssl.key.password=<password>
ssl.truststore.location=/opt/obuildfactory/jdk-1.8.0-openjdk-x86_64/jre/bin/mlb_ssl/kafka.client.truststore.jks
ssl.truststore.password=<password>

Also I was trying to use SAN approach since when I move to multi filebeat to multi kafka nodes where I can generate a self-signed certificate one per all kafka nodes in cluster and another per all filebeat nodes in cluster (instead of having each keys/cert per kafka/filebeat node). I unfortunately don't have ability to work with CA's. Is this a feasible alternative solution. Kindly advise

If you use self-signed certificates then you created the signing certificate, so you are the CA. Note that step 2 in that document you linked to is "Create your own Certificate Authority (CA)". You got that over to filebeat in pem format already, right? If you think kafka is working then as I said, test with verification_mode none in filebeat.

Hi Badger, I am getting this error now

2018-02-09T19:38:25Z INFO Setup Beat: filebeat; Version: 5.2.1
2018-02-09T19:38:25Z CRIT Failed loading client certificate%!(EXTRA *errors.errorString=tls: private key does not match public key)
2018-02-09T19:38:25Z ERR failed to initialize kafka plugin as output: 1 error: tls: private key does not match public key
2018-02-09T19:38:25Z CRIT Exiting: error initializing publisher: 1 error: tls: private key does not match public key

I converted the ca-cert file in kafka to pem file using openssl command and copied it to filebeat node.
Also I had a filebeat private key earlier(which I assume is of no use now). Anyway tried both these configs and got the above error

Filebeat config

output.kafka:
  hosts: ["<kafka IP>:9093"]
  topic: '%{[type]}'
  # ssl crt/key files
  ssl.certificate: "/etc/filebeat_ssl/kafka-filebeat.pem"
  ssl.key: "/etc/filebeat_ssl/kafka-key.key" # copied kafka key(ca-key)
  ssl.verification_mode:none
  compression: gzip

Config 2

output.kafka:
  hosts: ["<kafka IP>:9093"]
  topic: '%{[type]}'
  # ssl crt/key files
  ssl.certificate: "/etc/filebeat_ssl/kafka-filebeat.pem"
  ssl.key: "/etc/filebeat_ssl/filebeat.key" # filebeat key generated using openssl genrsa -out filebeat_.key 2048 
  ssl.verification_mode:none
  compression: gzip

This is the files I currently have in my kafka node

ca-cert ca-cert.srl ca-key cert-file cert-signed kafka-filebeat.pem kafka.client.truststore.jks kafka.server.keystore.jks

Thanks so much for your guidance.

Respectfully,
Gangadhar

On the filebeat side you do not want to have ssl.certificate configured unless you are trying to use client authentication. If you just want to encrypt the connection then all you need is ssl.certificate_authorities, which should point to your ca cert in pem format.

Ok, it doesn't show the same error now but hitting different one now

2018-02-09T20:23:36Z WARN client/metadata fetching metadata for all topics from broker <kafka IP>:9093

2018-02-09T20:23:36Z WARN Failed to connect to broker <kafka IP>:9093: tls: first record does not look like a TLS handshake

2018-02-09T20:23:36Z WARN kafka message: client/metadata got error from broker while fetching metadata:%!(EXTRA tls.RecordHeaderError=tls: first record does not look like a TLS handshake)
2018-02-09T20:23:36Z WARN kafka message: client/metadata no available broker to send metadata request to
2018-02-09T20:23:36Z WARN client/brokers resurrecting 1 dead seed brokers
2018-02-09T20:23:36Z WARN kafka message: Closing Client
2018-02-09T20:23:36Z ERR Kafka connect fails with: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)
2018-02-09T20:23:36Z ERR Connect failed with: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)
2018-02-09T20:23:40Z WARN kafka message: Initializing new client
2018-02-09T20:23:40Z WARN client/metadata fetching metadata for all topics from broker <kafka IP>:9093

Updated filebeat config

output.kafka:
  hosts: ["<kafka IP>:9093"]
  topic: '%{[type]}'
  # ssl crt/key files
  # ssl.certificate: "/etc/filebeat_ssl/kafka-filebeat.pem"
  # ssl.key: "/etc/filebeat_ssl/kafka-key.key"
  ssl.certificate_authorities: ["/etc/filebeat_ssl/kafka-filebeat.pem"] # pem format of kafka ca-cert file
  # ssl.verification_mode: none
  compression: gzip

I created the kafka-filebeat.pem file using command

openssl x509 -in ca-cert -out kafka-filebeat.pem -outform PEM
on kafka node and manually copied to filebeat node

What does "openssl s_client -connect :9093 < /dev/null" produce? I would expect it say CONNECTED, followed by some errors in the chain verification, followed by Certificate Chain, Server Certificate, and then the details of the negotiated session.

Hi badger, really appreciate your guidance and prompt response. Executed the command from filebeat node. This is the output

    openssl s_client -connect <kafka IP>:9093 < /dev/null
    CONNECTED(00000003)
    depth=1 C = US, ST = Texas, L = Dallas, O = IBM Bluemix Infrastructure, OU = SoftLayer, emailAddress = monitoring@softlayer.com, CN = kafka
    verify error:num=19:self signed certificate in certificate chain
    verify return:0
    ---
    Certificate chain
     0 s:/C=US/ST=Texas/L=Dallas/O=IBM/OU=Softlayer/CN=Gangadhar Mahadevan
       i:/C=US/ST=Texas/L=Dallas/O=IBM Bluemix Infrastructure/OU=SoftLayer/emailAddress=monitoring@softlayer.com/CN=kafka
     1 s:/C=US/ST=Texas/L=Dallas/O=IBM Bluemix Infrastructure/OU=SoftLayer/emailAddress=monitoring@softlayer.com/CN=kafka
       i:/C=US/ST=Texas/L=Dallas/O=IBM Bluemix Infrastructure/OU=SoftLayer/emailAddress=monitoring@softlayer.com/CN=kafka
    ---
    Server certificate
    -----BEGIN CERTIFICATE-----
 certificate
    -----END CERTIFICATE-----
    subject=/C=US/ST=Texas/L=Dallas/O=IBM/OU=Softlayer/CN=Gangadhar Mahadevan
    issuer=/C=US/ST=Texas/L=Dallas/O=IBM Bluemix Infrastructure/OU=SoftLayer/emailAddress=monitoring@softlayer.com/CN=kafka
    ---
    No client certificate CA names sent
    ---
    SSL handshake has read 2494 bytes and written 479 bytes
    ---
    New, TLSv1/SSLv3, Cipher is DHE-DSS-AES128-GCM-SHA256
    Server public key is 1024 bit
    Secure Renegotiation IS supported
    Compression: NONE
    Expansion: NONE
    SSL-Session:
        Protocol  : TLSv1.2
        Cipher    : DHE-DSS-AES128-GCM-SHA256
        Session-ID: 5A7E0AD98AA394F1FDF7051D715AAE1787CF372CD606DB2B31026B093ACDEDE4
        Session-ID-ctx:
        Master-Key: C50EBBAACC596939CCABCDE5D69D6FF255E85A801AB8F318AA649D6B0F4C47BCF19B1010EB212518EC0DDF68AB259BEA
        Key-Arg   : None
        PSK identity: None
        PSK identity hint: None
        SRP username: None
        Start Time: 1518209751
        Timeout   : 300 (sec)
        Verify return code: 19 (self signed certificate in certificate chain)
    ---
    DONE

OK, so openssl connects OK, which suggests kafka is correctly configured. Running out of ideas here. Searching throws up a number of other occurrences of this (e.g. this, this, and this).

Can you remove the non-SSL entry from advertised listeners?

It would be interesting to see what Wireshark or tcpdump says about the handshake. Maybe you can do a capture like tcpdump -i eth0 -w handshake.pcap tcp port 9093. Then take a look at the pcap in Wireshark.

Hi Badger, This is my current kafka config

listeners=ONSL://<IP>:9092,INSL://<IP>:9093


# advertised listener part
advertised.listeners=ONSL://<IP>,INSL://<IP>:9093
inter.broker.listener.name=INSL
listener.security.protocol.map=INSL:SSL,ONSL:PLAINTEXT

When I changed the config to 

listeners=ONSL://<IP>:9092,INSL://IP:9093


# advertised listener part
advertised.listeners=INSL://<IP>:9093
# inter.broker.listener.name=INSL
# listener.security.protocol.map=INSL:SSL,ONSL:PLAINTEXT

and start kafka it fails with error

[2018-02-09 15:29:59,906] FATAL  (kafka.Kafka$)
java.lang.IllegalArgumentException: Error creating broker listeners from 'ONSL://172.16.0.110:9092,INSL://172.16.0.110:9093': No security protocol defined for listener ONSL
        at kafka.utils.CoreUtils$.listenerListToEndPoints(CoreUtils.scala:278)

Have I misunderstood anything?

Hi Andrew, Do you want to run the tcpdump command on filebeat node or kafka. Both reside at different nodes in my environment. Also should I replace any arguments with values like eth0 etc. Kindly advise

You cannot comment that out. You still have a listener ONSL://<<IP> defined, so you need that map line to let kafka know that it is really PLAINTEXT. Just update the advertised.listeners so that it only includes the INSL

Hi,

So the updated config is

# advertised listener part
advertised.listeners=INSL://<IP>:9093
#advertised.listeners=ONSL://<IP>:9092,INSL://<IP>:9093
inter.broker.listener.name=INSL
listener.security.protocol.map=INSL:SSL,ONSL:PLAINTEXT

and kafka logs show

[2018-02-09 15:42:33,373] WARN Failed to send SSL Close message  (org.apache.kafka.common.network.SslTransportLayer)
java.io.IOException: Connection reset by peer
        at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
        at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
        at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
        at sun.nio.ch.IOUtil.write(IOUtil.java:65)
        at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
        at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:212)
        at org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:170)
        at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:703)
        at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:61)
        at org.apache.kafka.common.network.Selector.doClose(Selector.java:717)
        at org.apache.kafka.common.network.Selector.close(Selector.java:708)
        at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:500)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:398)
        at kafka.network.Processor.poll(SocketServer.scala:535)
        at kafka.network.Processor.run(SocketServer.scala:452)
        at java.lang.Thread.run(Thread.java:745)
[2018-02-09 15:42:35,635] ERROR [KafkaApi-1] Error when handling request {topics=[local-dev-vergil-adc01-rg-syslog]} (kafka.server.KafkaApis)
kafka.common.BrokerEndPointNotAvailableException: Broker `1` does not have listener with name `ListenerName(ONSL)`

this kafka topic local-dev-vergil-adc01-rg-syslog is outside of our filebeat context and seems this topic is affected with above config. Please advise