Connectivity Issue between Winlog - Kafka - Logstash

Hello,

I am using elastic stack v8 composed of:

  • 2 servers: Masters + data

  • 1 server: data + kibana

  • 1 server: kafka + logstash

  • winglogbeat configuration

output.kafka:
  hosts: ["X.X.X.X:9092"]
  topic: "Windows"  
  • kafka server properties:
broker.id=1
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://localhost:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=3
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=3
delete.topic.enable = true
log.dirs=/home/admin1/kafka/logs
  • logstash conf.d configuration:
input {
kafka{
codec => json
bootstrap_servers => "localhost:9092"
topics => ["Windows"]
}

}
filter {
}

output {
        stdout {
         codec => json_lines
        }
        elasticsearch {
         hosts =>["https://X.X.X.X:9200","https://Y.Y.Y.Y:9200","https://Z.Z.Z.Z:9200"]
         cacert => '/etc/logstash/config/certs/ca/ca.crt'
         user => "elastic"
         password => "4mW081ofQx78Bq9UxRjC"
         index => "winlogbeat-%{YYYY.MM.dd}"
         document_id => "%{id}"
        }
}
The issue is that kafka is not electing a leader for topic Windows as shown in the logstash logs:

[2022-04-11T06:01:20,590][WARN ][org.apache.kafka.clients.NetworkClient][main][64e5d8efc9999e885111495bfffac42e92f12766ae3bbe0dd059d9a315612c18] [Consumer clientId=logstash-0, groupId=logstash] Error while fetching metadata with correlation id 1012686 : {Windows=LEADER_NOT_AVAILABLE}


* Kafka Logs:

[2022-04-11 06:01:20,077] INFO [ExpirationReaper-1-topic]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2022-04-11 06:01:20,097] INFO [ExpirationReaper-1-Heartbeat]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2022-04-11 06:01:20,102] INFO [ExpirationReaper-1-Rebalance]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2022-04-11 06:01:20,166] INFO [GroupCoordinator 1]: Starting up. (kafka.coordinator.group.GroupCoordinator)
[2022-04-11 06:01:20,186] INFO [GroupCoordinator 1]: Startup complete. (kafka.coordinator.group.GroupCoordinator)
[2022-04-11 06:01:20,240] INFO [TransactionCoordinator id=1] Starting up. (kafka.coordinator.transaction.TransactionCoordinator)
[2022-04-11 06:01:20,251] INFO [TransactionCoordinator id=1] Startup complete. (kafka.coordinator.transaction.TransactionCoordinator)
[2022-04-11 06:01:20,251] INFO [Transaction Marker Channel Manager 1]: Starting (kafka.coordinator.transaction.TransactionMarkerChannelManager)
[2022-04-11 06:01:20,307] INFO [ExpirationReaper-1-AlterAcls]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2022-04-11 06:01:20,334] INFO [/config/changes-event-process-thread]: Starting (kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread)
[2022-04-11 06:01:20,363] INFO [SocketServer listenerType=ZK_BROKER, nodeId=1] Starting socket server acceptors and processors (kafka.network.SocketServer)
[2022-04-11 06:01:20,386] INFO [SocketServer listenerType=ZK_BROKER, nodeId=1] Started data-plane acceptor and processor(s) for endpoint : ListenerName(PLAINTEXT) (kafka.network.SocketServer)
[2022-04-11 06:01:20,387] INFO [SocketServer listenerType=ZK_BROKER, nodeId=1] Started socket server acceptors and processors (kafka.network.SocketServer)
[2022-04-11 06:01:20,396] INFO Kafka version: 3.1.0 (org.apache.kafka.common.utils.AppInfoParser)
[2022-04-11 06:01:20,396] INFO Kafka commitId: 37edeed0777bacb3 (org.apache.kafka.common.utils.AppInfoParser)
[2022-04-11 06:01:20,396] INFO Kafka startTimeMs: 1649656880387 (org.apache.kafka.common.utils.AppInfoParser)
[2022-04-11 06:01:20,403] INFO [KafkaServer id=1] started (kafka.server.KafkaServer)
[2022-04-11 06:01:20,492] INFO [BrokerToControllerChannelManager broker=1 name=forwarding]: Recorded new controller, from now on will use broker localhost:9092 (id: 1 rack: null) (kafka.server.BrokerToControllerRequestThread)
[2022-04-11 06:01:20,509] INFO [BrokerToControllerChannelManager broker=1 name=alterIsr]: Recorded new controller, from now on will use broker localhost:9092 (id: 1 rack: null) (kafka.server.BrokerToControllerRequestThread)


I would like to know why kafka is not electing a leader to the topic:

$ ./kafka-topics.sh --list --bootstrap-server localhost:9092
--list
Linux
Windows

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.