Kafka input failover

If I have a kafka topic replicated across 2 brokers, will logstash failover to the 2nd broker if the first becomes unavailable?

It's not working, for me, but I'm not sure that logstash will failover in this instance. When the current broker dies, we see the log: [Consumer clientId=logstash-0, groupId=logstash] Marking the coordinator elkrck1:5044 (id: 2147483647 rack: null) dead

Here is my kafka input:

input {
  kafka {
    id => "kafka_tester"
    topics => ["monrkntest","kubernetes_audit"]
    decorate_events => "true"
    bootstrap_servers => "elkrck1:5044,elkrck2:5044"
    auto_offset_reset => "earliest"
  }

here is my topic:

./kafka-topics.sh --describe --zookeeper elkrck1:2181 --topic monrkntest

Topic:monrkntest PartitionCount:1 ReplicationFactor:2 Configs:
Topic: monrkntest Partition: 0 Leader: 1 Replicas: 1,0 Isr: 1

I'm not sure if it would help for failover, but try setting the group_id and the consumer_threads in the Kafka input.

I've added a group_id, and consumer_threads but that didn't fix the issue. I'm wondering if logstash will only connect to 1 instance of kafka? I hope that's not the case.

When I take the kafka leader offline this is the current logs for logstash:

[2018-04-16T09:52:01,875][INFO ] . [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-0,  groupId=logstash] Discovered coordinator elkrck1:5044 (id: 2147483647 rack: null)
[2018-04-16T09:52:01,875][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-0, groupId=logstash] Marking the coordinator elkrck1:5044 (id: 2147483647 rack: null) dead
[2018-04-16T09:52:01,978][WARN ][org.apache.kafka.clients.NetworkClient] [Consumer clientId=logstash-0, groupId=logstash] Connection to node 0 could not be established. Broker may not be available.

zookeeper is seeing the only running kafka instance as the broker:

./zookeeper-shell.sh 10.170.226.86:2181 <<< "ls /brokers/ids"
Connecting to 10.170.226.86:2181
Welcome to ZooKeeper!
JLine support is disabled

WATCHER::

WatchedEvent state:SyncConnected type:None path:null
[1]

echo stat | nc 10.170.226.12 2181
Zookeeper version: 3.4.10-39d3a4f269333c922ed3db283be479f9deacaa0f, built on 03/23/2017 10:13 GMT
Clients:
 /10.170.226.12:57072[1](queued=0,recved=1179,sent=1185)
 /10.170.226.86:59230[0](queued=0,recved=1,sent=0)

Latency min/avg/max: 0/2/107
Received: 3529
Sent: 3563
Connections: 2
Outstanding: 0
Zxid: 0x1000001bb
Mode: leader
Node count: 161

After reading through the Kafka input plugin documentation I bet it has to do with setting the session_timeout_ms and possibly the heartbeat_internal_ms setting too.

I will test this in the next day or so in a DEV environment because I need to make sure it's working as well.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.