Filebeat , Kafka Cluster and Zookeeper ensemble configuration with SASL authentication

Hi All, 
I have a working Filebeat, Kafka Cluster, and Zookeeper Ensemble  but  after  integrating   SASL authentication I am facing below exception, 

 Filebeat:-
2018-05-23T11:58:13+02:00 INFO kafka message: [client/metadata got error from broker while fetching metadata: dial tcp serverA:9092: getsockopt: connection refused]
2018-05-23T11:58:13+02:00 INFO kafka message: [client/metadata no available broker to send metadata request to]
2018-05-23T11:58:13+02:00 INFO client/brokers resurrecting [[1]] dead seed brokers
2018-05-23T11:58:21+02:00 INFO Non-zero metrics in the last 30s: beat.info.uptime.ms=30000 beat.memstats.gc_next=19871168 beat.memstats.memory_alloc=14716384 beat.memstats.memory_total=2026369082728 filebeat.harvester.open_files=70 filebeat.harvester.running=100 libbeat.config.module.running=0 libbeat.output.events.batches=1652 libbeat.output.events.failed=3383296 libbeat.output.events.total=3383296 libbeat.pipeline.clients=1 libbeat.pipeline.events.active=4117 libbeat.pipeline.events.retry=3383296 registrar.states.current=295
2018-05-23T11:58:23+02:00 INFO client/metadata fetching metadata for [[[SASL] serverA:9092]] from broker %!s(MISSING)
2018-05-23T11:58:23+02:00 INFO Failed to connect to broker [[serverA:9092 dial tcp serverA:9092: getsockopt: connection refused]]: %!s(MISSING)
  
Zookeeper:- 
2018-05-23 07:39:59,476 [myid:1] - INFO  [ProcessThread(sid:1 cport:-1):: ] - Got user-level KeeperException when processing sessionid:0x301cae0b3480002 type:delete cxid:0x48 zxid:0x20000004e txntype:-1 reqpath:n/a Error Path:/admin/preferred_replica_election Error:KeeperErrorCode = NoNode for /admin/preferred_replica_election
2018-05-23 07:40:39,240 [myid:1] - INFO  [ProcessThread(sid:1 cport:-1)::PrepRequestProcessor@653] - Got user-level KeeperException when processing sessionid:0x200b4f13c190006 type:create cxid:0x20 zxid:0x200000052 txntype:-1 reqpath:n/a Error Path:/brokers Error:KeeperErrorCode = NodeExists for /brokers
2018-05-23 07:40:39,240 [myid:1] - INFO  [ProcessThread(sid:1 cport:-1)::PrepRequestProcessor@653] - Got user-level KeeperException when processing sessionid:0x200b4f13c190006 type:create cxid:0x21 zxid:0x200000053 txntype:-1 reqpath:n/a Error Path:/brokers/ids Error:KeeperErrorCode = NodeExists for /brokers/ids
2018-05-23 07:41:00,864 [myid:1] - INFO  [ProcessThread(sid:1 cport:-1)::PrepRequestProcessor@653] - Got user-level KeeperException when processing sessionid:0x301cae0b3480004 type:create cxid:0x20 zxid:0x200000058 txntype:-1 reqpath:n/a Error Path:/brokers Error:KeeperErrorCode = NodeExists for /brokers
2018-05-23 07:41:00,864 [myid:1] - INFO  [ProcessThread(sid:1 cport:-1)::PrepRequestProcessor@653] - Got user-level KeeperException when processing sessionid:0x301cae0b3480004 type:create cxid:0x21 zxid:0x200000059 txntype:-1 reqpath:n/a Error Path:/brokers/ids Error:KeeperErrorCode = NodeExists for /brokers/ids
2018-05-23 07:41:28,456 [myid:1] - INFO  [ProcessThread(sid:1 cport:-1)::PrepRequestProcessor@487] - Processed session termination for sessionid: 0x200b4f13c190002
2018-05-23 07:41:29,563 [myid:1] - INFO  [ProcessThread(sid:1 cport:-1)::PrepRequestProcessor@487] - Processed session termination for sessionid: 0x301cae0b3480002
2018-05-23 07:41:29,569 [myid:1] - INFO  [ProcessThread(sid:1 cport:-1)::PrepRequestProcessor@653] - Got user-level KeeperException when processing sessionid:0x200b4f13c190006 type:create cxid:0x2d zxid:0x20000005f txntype:-1 reqpath:n/a Error Path:/controller Error:KeeperErrorCode = NodeExists for /controller
2018-05-23 07:41:29,679 [myid:1] - INFO  [ProcessThread(sid:1 cport:-1)::PrepRequestProcessor@653] - Got user-level KeeperException when processing sessionid:0x301cae0b3480004 type:delete cxid:0x4e zxid:0x200000061 txntype:-1 reqpath:n/a Error Path:/admin/preferred_replica_election Error:KeeperErrorCode = NoNode for /admin/preferred_replica_election
 
Kafka:- 
 [2018-05-23 09:06:31,969] ERROR [ReplicaFetcherThread-0-1]: Error for partition [23MAY,0] to broker 1:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition. (kafka.server.ReplicaFetcherThread)
 
ERROR [ReplicaFetcherThread-0-2]: Current offset 142474 for partition [23MAY,1] out of range; reset offset to 142478 (kafka.server.ReplicaFetcherThread)
 
 ERROR [ReplicaFetcherThread-0-2]: Error for partition [23MAY,2] to broker 2:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition. (kafka.server.ReplicaFetcherThread)
 
Below are my configuration:- 
Filebeat:-  

Filebeat.yml 
filebeat.prospectors:
 - type: log
   enabled: true
   paths:
      - /usr/local/dsxdev/apache-tomee-instances/PEO-DEV-23/logs/*.*
output.kafka:
  # initial brokers for reading cluster metadata
  hosts: ["serverA:9092", "serverB:9092", "serverC:9092"]
  topic: "Kafka"
  username: "admin"
  password: "admin-secret"
  client_id: "DEV"

Zookeeper:- 
  java.env
SERVER_JVMFLAGS="-Djava.security.auth.login.config=/usr/local/zookeeper/conf/ZK_jaas.conf"
 
 ZK_jaas.conf
Server
{ org.apache.zookeeper.server.auth.DigestLoginModule required  username="admin"   password="admin-secret"   user_admin="admin-secret";  }
;
 QuorumServer
{        org.apache.zookeeper.server.auth.DigestLoginModule required        user_test="test"; }
;
 QuorumLearner
{        org.apache.zookeeper.server.auth.DigestLoginModule required        username="test"        password="test"; }
;
zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/zookeeper/zookeeper-3.4.12/data
dataLogDir=/zookeeper/zookeeper-3.4.12/data-logs
clientPort=2181
maxClientCnxns=60
 authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
authProvider.2=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
authProvider.3=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
quorumListenOnAllIPs=true
quorum.auth.enableSasl=true
quorum.auth.learnerRequireSasl=true
quorum.auth.serverRequireSasl=true
quorum.auth.learner.loginContext=QuorumLearner
quorum.auth.server.loginContext=QuorumServer
quorum.cnxn.threads.size=20
autopurge.snapRetainCount=5
autopurge.purgeInterval=0
 server.1=serverA:2888:3888
server.2=serverB:2888:3888
server.3=serverC:2888:3888

Kafka Configuration:-

Kafka :- 
 kafka_jaas.conf
KafkaServer {    org.apache.kafka.common.security.plain.PlainLoginModule required    username="admin"    password="admin-secret"    user_admin="admin-secret"; }
;
Client
{    org.apache.zookeeper.server.auth.DigestLoginModule required    username="admin"    password="admin-secret"; }
;
 
server.properties
 broker.id=0
delete.topic.enable=true
port=9092
group.id=KAFKA
log.dirs=/kafka/logs01
zookeeper.connect=serverA:2181,serverB:2181,serverC:2181
zookeeper.connection.timeout.ms=6000
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=true
auto.create.topics.enable=false
listeners=SASL_PLAINTEXT://serverA:9092   ------------------------------>  serverB for broker 2 and serverC for broker 3
advertised.listeners=SASL_PLAINTEXT://serverA:9092  --------------- >  serverB for broker 2 and serverC for broker 3
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
advertised.host.name=serverA
num.partitions=1
num.recovery.threads.per.data.dir=1
log.flush.interval.messages=30000000
log.flush.interval.ms=1800000
log.retention.minutes=30
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
delete.topic.enable=true
super.users=User:admin
 
kafka-run-class.sh 
added JVM parameter  in  kafka-run-class.sh

Generic jvm settings you want to add
if [ -z "$KAFKA_OPTS" ]; then
  KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka/config/kafka_jaas.conf"
 
 Please let me know what i am missing in my configuration . 

Thanks in advance .

Log from filebeat indicates, filebeat can not connect to kafka. Connection is refused on TCP level.

Is kafka even running stable?

Have you tried ping/telnet from filebeat host to the kafka brokers hosts?

No idea about the zookeeper logs.

With these kafka logs I wonder if the kafka cluster is actually stable. But timestamps do not match the other services log timestamps. It seems none of the logs shown are related in one or the other way.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.