Filebeat connect kafka failed over ssl

I have a working Filebeat 6.1.2-linux-x86_64 and Kafka 0.10 setup.

I am trying to set them up to use SSL(TLS) instead of PLAINTEXT.

my filebeat.yml

...
output.kafka:
enabled: true
hosts: ["kafkabroker:9092"]
topic: 'filebeat_test'
version: 0.10.1.0
required_acks: 1
ssl.enabled: true
ssl.certificate_authorities: ["/home/work/cert/ca.pem"]
ssl.supported_protocols: [TLSv1.0, TLSv1.1, TLSv1.2]
ssl.certificate: "/home/work/cert/client.pem"
ssl.key: "/home/work/cert/client.key"

my running log

2018/01/24 03:12:11.097927 log.go:36: INFO client/metadata fetching metadata for all topics from broker [[kafkabroker:9092]]
2018/01/24 03:12:11.103970 log.go:36: INFO Failed to connect to broker [[kafkabroker:9092 x509: certificate is valid for mybroker, not kafkabroker]]: %!s(MISSING)
2018/01/24 03:12:11.104012 log.go:36: INFO kafka message: [client/metadata got error from broker while fetching metadata: x509: certificate is valid for mybroker, not kafkabroker]
2018/01/23 11:47:38.197954 log.go:36: INFO kafka message: [client/metadata no available broker to send metadata request to]
2018/01/23 11:47:38.197969 log.go:36: INFO client/brokers resurrecting [[1]] dead seed brokers
2018/01/23 11:47:38.197984 log.go:36: INFO kafka message: [Closing Client]
2018/01/23 11:47:38.198005 client.go:69: ERR Kafka connect fails with: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)
2018/01/23 11:47:38.198017 output.go:74: ERR Failed to connect: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)

But I try to use the go demo to new a kafka producer over ssl, it works well.
the code is shown as follows:

package main

import (
"crypto/tls"
"crypto/x509"
"flag"
"fmt"
"GitHub - IBM/sarama: Sarama is a Go library for Apache Kafka."
"io/ioutil"
"log"
"os"
)

var (
broker = flag.String("broker", "kafkabroker:9092", "host")
topic = flag.String("topic", "", "Required, the topic to consume from. You can create a topic from console.")
enableTLS = flag.Bool("enableTLS", true, "TLS is required to access Kafka service.")
clientPemPath = flag.String("client_pem", "client.pem", "client_pem")
clientKeyPath = flag.String("client_key", "client.key", "client_key")
caPemPath = flag.String("ca_pem", "ca.pem", "ca_pem")
)

func main() {
if len(os.Args[1:]) == 0 {
flag.Usage()
os.Exit(1)
}

flag.Parse()

if *topic == "" {
panic("Argument topic is required.")
}

sarama.Logger = log.New(os.Stderr, "[sarama]", log.LstdFlags)

config := sarama.NewConfig()
config.Version = sarama.V0_10_1_0
config.Net.TLS.Enable = *enableTLS
if *enableTLS {
config.Net.TLS.Config = configTLS()
}

config.Producer.Return.Successes = true
config.Producer.RequiredAcks = sarama.WaitForLocal
config.Producer.Retry.Max = 5

producer, err := sarama.NewSyncProducer(string{*broker}, config)
if err != nil {
log.Panic(err)
}

defer func() {
if err := producer.Close(); err != nil {
log.Panic(err)
}
}()

numOfRecords := 10
for i := 0; i < numOfRecords; i++ {
message := &sarama.ProducerMessage{
Topic: *topic,
Value: sarama.StringEncoder(fmt.Sprintf("%d-hello kafka", i)),
}
partition, offset, err := producer.SendMessage(message)
if err != nil {
log.Fatal(err)
}

}
}

func configTLS() (t *tls.Config) {
checkFile(*clientPemPath)
checkFile(*clientKeyPath)
checkFile(*caPemPath)

clientPem, err := tls.LoadX509KeyPair(*clientPemPath, *clientKeyPath)
if err != nil {
log.Panic(err)
}

caPem, err := ioutil.ReadFile(*caPemPath)
if err != nil {
log.Panic(err)
}

certPool := x509.NewCertPool()
certPool.AppendCertsFromPEM(caPem)
t = &tls.Config{
Certificates: tls.Certificate{clientPem},
RootCAs: certPool,
InsecureSkipVerify: true,
}

return t
}

func checkFile(path string) {
file, err := os.Open(path)
if err != nil {
panic(err)
}

stat, err := file.Stat()
if err != nil {
panic(err)
}

if stat.Size() == 0 {
panic("Please replace " + path + " with your own. ")
}

}

the ca.Pem,client.Pem and client.key are same as the config of filebeat, and the demo is running in the same machine. Then the consumer can get the message.
What am I missing in the config of filebeat?
I would appreciate any help at this point, thanks!

To me, this looks like the core of the issue: x509: certificate is valid for mybroker, not, which should be interpreted as x509: certificate is valid for [mybroker], not [].

I think in the filebeat.yml you need something like: hosts: ["mybroker:9092"] where mybroker should resolve to the Kafka broker.

Thanks for your help.
Actually, I have the config like : hosts: ["kafkabroker:9092"] in my filebeat.yml. kafkabroker is
hostname of kafka server, but mybroker is not.

In addition, I recheck the log and have found that the previous log missed something, the whole log information is shown as follows.

2018/01/24 03:12:11.097927 log.go:36: INFO client/metadata fetching metadata for all topics from broker [[kafkabroker:9092]]
2018/01/24 03:12:11.103970 log.go:36: INFO Failed to connect to broker [[kafkabroker:9092 x509: certificate is valid for mybroker, not kafkabroker]]: %!s(MISSING)
2018/01/24 03:12:11.104012 log.go:36: INFO kafka message: [client/metadata got error from broker while fetching metadata: x509: certificate is valid for mybroker, not kafkabroker]

Thanks again for your help.

Ok, but it seems that the certificate is for mybroker, while you are trying to connect to kafkabroker. If you can you recreate the certificate, make sure to specify kafkabroker as the CNAME.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.