Enable SASL Authentication for Kafka Output in Filebeat Configuration

hosts: ["10.42.41.55:19094"]
topic: 'npc-raw-msg'
key: '%{[UUID]}'
codec.format:
string: '%{[message]}'
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max_message_bytes: 1000000
close_inactive: 10m

We now need to enable SASL authentication (either PLAIN or SCRAM-SHA-512) for connecting Filebeat to Kafka.

Please provide guidance or configuration examples to securely set up SASL authentication between Filebeat and Kafka

Hi @Thirupathi

You can enable SASL authentication (SCRAM-SHA-512) with TLS in Filebeat for Kafka using the following minimal configuration:

output.kafka:
  hosts: ["10.42.41.55:19094"]
  topic: "npc-raw-msg"
  key: "%{[UUID]}"
  sasl.mechanism: SCRAM-SHA-512
  sasl.username: "${KAFKA_USERNAME}"
  sasl.password: "${KAFKA_PASSWORD}"
  security.protocol: SASL_SSL
  ssl.enabled: true
  ssl.certificate_authorities: ["/etc/filebeat/certs/ca.pem"]

Steps to securely store credentials using Filebeat Keystore:

filebeat keystore create # Run once if keystore not created
filebeat keystore add kafka.username
filebeat keystore add kafka.password

  • Replace ${KAFKA_USERNAME} and ${KAFKA_PASSWORD} with the keystore variables.

  • Ensure the CA certificate path points to your Kafka broker’s certificate authority.

  • This configuration uses SASL-SCRAM over SSL for secure authentication and encryption.

This approach avoids hardcoding credentials and is production-ready.

{"log.level":"debug","@timestamptimestamp":"2025-10-09T05:53:58.464-0400","log.logger":"registrar","log.origin":{"function":"github.com/elastic/beats/v7/filebeat/registrar.(*Registrar).commitStateUpdates","file.name":"registrar/registrar.go","file.line":203},"message":"Registry file updated. 2 active states.","service.name":"filebeat","ecs.version":"1.6.0"}

{"log.level"@timestamp"info","@timestamp":"2025-10-09T05:54:00.459-0400","log.logger":"publisher_pipeline_output","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/publisher/pipeline.(*netClientWorker).run","file.name":"pipeline/client_worker.go","file.line":138},"message":"Connecting to kafka(10.42.41.55:19094)","service.name":"filebeat","ecs.version":"1.6.0"}

{"lo@timestamp.level":"debug","@timestamp":"2025-10-09T05:54:00.459-0400","log.logger":"kafka","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/outputs/kafka.(*client).Connect","file.name":"kafka/client.go","file.line":120},"message":"connect: [10.42.41.55:19094]","service.name":"filebeat","ecs.version":"1.6@timestamp0"}

{"log.level":"info","@timestamp":"2025-10-09T05:54:00.459-0400","log.logger":"publisher_pipeline_output","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/publisher/pipeline.(*netClientWorker).run","file.name":"pipeline/client_worker.go","file.line":146},"message":"Connection to kafka(10.42.41.55:19094) established","service.name":"filebeat","ecs.versi@timestampn":"1.6.0"}

{"log.level":"error","@timestamp":"2025-10-09T05:54:00.461-0400","log.logger":"kafka","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/outputs/kafka.(*client).errorWorker","file.name":"kafka/client.go","file.line":338},"message":"Kafka (topic=npc-raw-msg): kafka: couldn't fetch broker metadata (check that your client and broker are using the same encryption and authentication settings)","service.name":"filebeat","e@timestamps.version":"1.6.0"}

^C{"log.level":"info","@timestamp":"2025-10-09T05:54:04.050-0400","log.logger":"service","log.origin":{"function":"github.com/elastic/elastic-agent-libs/service.HandleSignals.func1","file.name":"service/service.go","file.line":52},"message":"Received signal "interrupt", stopping","service.name":"fi@timestampebeat","ecs.version":"1.6.0"}

{"log.level":"debug","@timestamp":"2025-10-09T05:54:04.050-0400","log.logger":"publisher","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/publisher/pipeline.(*Pipeline).Close","file.name":"pipeline/pipeline.go","file.line":167},"message":"close pipeline","service.@timestampame":"filebeat","ecs.version":"1.6.0"}

{"log.level":"debug","@timestamp":"2025-10-09T05:54:05.051-0400","log.logger":"kafka","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/outputs/kafka.(*client).Close","file.name":"kafka/client.go","file.line":141},"message":"closed kafka client",@timestampservice.name":"filebeat","ecs.version":"1.6.0"}

{"log.level":"debug","@timestamp":"2025-10-09T05:54:05.051-0400","log.logger":"publisher","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/publisher/pipeline.(*queueReader).run","file.name":"pipeline/queue_reader.go","file.line":54},"message":"pipeline event consumer queue reade@timestamp: stop","service.name":"filebeat","ecs.version":"1.6.0"}

{"log.level":"error","@timestamp":"2025-10-09T05:54:05.051-0400","log.logger":"kafka","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/outputs/kafka.(*client).errorWorker","file.name":"kafka/client.go","file.line":338},"message":"Kafka (topic=npc-raw-msg): kafka: couldn't fetch broker metadata (check that your client and broker are using the same encryption and authenti@timestampation settings)","service.name":"filebeat","ecs.version":"1.6.0"}

{"log.level":"error","@timestamp":"2025-10-09T05:54:0

output.kafka:

hosts: ["xx.xx.xx:19094"]

topic: 'npc-raw-msg'

key: '%{[UUID]}'

codec.format:

string: '%{[message]}'

partition.round_robin:

reachable_only: false

required_acks: 1

compression: gzip

max_message_bytes: 1000000

close_inactive: 10m

sasl.mechanism: SCRAM-SHA-512

sasl.username: "test-user-ranjith"

sasl.password: "xSBWc2w6NmkiPi4+YyBs"

security.protocol: SASL_PLAINTEXT

logging.level: debug

logging.to_files: true

logging.files:

path: /var/log/filebeat-kafka

name: filebeat

keepfiles: 7

permissions: 0644

any update on above issue