I have a working Filebeat 6.1.2-linux-x86_64 and Kafka 0.10 setup.
I am trying to set them up to use SSL(TLS) instead of PLAINTEXT.
my filebeat.yml
...
output.kafka:
enabled: true
hosts: ["kafkabroker:9092"]
topic: 'filebeat_test'
version: 0.10.1.0
required_acks: 1
ssl.enabled: true
ssl.certificate_authorities: ["/home/work/cert/ca.pem"]
ssl.supported_protocols: [TLSv1.0, TLSv1.1, TLSv1.2]
ssl.certificate: "/home/work/cert/client.pem"
ssl.key: "/home/work/cert/client.key"
my running log
2018/01/24 03:12:11.097927 log.go:36: INFO client/metadata fetching metadata for all topics from broker [[kafkabroker:9092]]
2018/01/24 03:12:11.103970 log.go:36: INFO Failed to connect to broker [[kafkabroker:9092 x509: certificate is valid for mybroker, not kafkabroker]]: %!s(MISSING)
2018/01/24 03:12:11.104012 log.go:36: INFO kafka message: [client/metadata got error from broker while fetching metadata: x509: certificate is valid for mybroker, not kafkabroker]
2018/01/23 11:47:38.197954 log.go:36: INFO kafka message: [client/metadata no available broker to send metadata request to]
2018/01/23 11:47:38.197969 log.go:36: INFO client/brokers resurrecting [[1]] dead seed brokers
2018/01/23 11:47:38.197984 log.go:36: INFO kafka message: [Closing Client]
2018/01/23 11:47:38.198005 client.go:69: ERR Kafka connect fails with: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)
2018/01/23 11:47:38.198017 output.go:74: ERR Failed to connect: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)
But I try to use the go demo to new a kafka producer over ssl, it works well.
the code is shown as follows:
package main
import (
"crypto/tls"
"crypto/x509"
"flag"
"fmt"
"GitHub - IBM/sarama: Sarama is a Go library for Apache Kafka."
"io/ioutil"
"log"
"os"
)var (
broker = flag.String("broker", "kafkabroker:9092", "host")
topic = flag.String("topic", "", "Required, the topic to consume from. You can create a topic from console.")
enableTLS = flag.Bool("enableTLS", true, "TLS is required to access Kafka service.")
clientPemPath = flag.String("client_pem", "client.pem", "client_pem")
clientKeyPath = flag.String("client_key", "client.key", "client_key")
caPemPath = flag.String("ca_pem", "ca.pem", "ca_pem")
)func main() {
if len(os.Args[1:]) == 0 {
flag.Usage()
os.Exit(1)
}flag.Parse()
if *topic == "" {
panic("Argument topic is required.")
}sarama.Logger = log.New(os.Stderr, "[sarama]", log.LstdFlags)
config := sarama.NewConfig()
config.Version = sarama.V0_10_1_0
config.Net.TLS.Enable = *enableTLS
if *enableTLS {
config.Net.TLS.Config = configTLS()
}config.Producer.Return.Successes = true
config.Producer.RequiredAcks = sarama.WaitForLocal
config.Producer.Retry.Max = 5producer, err := sarama.NewSyncProducer(string{*broker}, config)
if err != nil {
log.Panic(err)
}defer func() {
if err := producer.Close(); err != nil {
log.Panic(err)
}
}()numOfRecords := 10
for i := 0; i < numOfRecords; i++ {
message := &sarama.ProducerMessage{
Topic: *topic,
Value: sarama.StringEncoder(fmt.Sprintf("%d-hello kafka", i)),
}
partition, offset, err := producer.SendMessage(message)
if err != nil {
log.Fatal(err)
}}
}func configTLS() (t *tls.Config) {
checkFile(*clientPemPath)
checkFile(*clientKeyPath)
checkFile(*caPemPath)clientPem, err := tls.LoadX509KeyPair(*clientPemPath, *clientKeyPath)
if err != nil {
log.Panic(err)
}caPem, err := ioutil.ReadFile(*caPemPath)
if err != nil {
log.Panic(err)
}certPool := x509.NewCertPool()
certPool.AppendCertsFromPEM(caPem)
t = &tls.Config{
Certificates: tls.Certificate{clientPem},
RootCAs: certPool,
InsecureSkipVerify: true,
}return t
}func checkFile(path string) {
file, err := os.Open(path)
if err != nil {
panic(err)
}stat, err := file.Stat()
if err != nil {
panic(err)
}if stat.Size() == 0 {
panic("Please replace " + path + " with your own. ")
}}
the ca.Pem,client.Pem and client.key are same as the config of filebeat, and the demo is running in the same machine. Then the consumer can get the message.
What am I missing in the config of filebeat?
I would appreciate any help at this point, thanks!