Logstash stop to output when one kafka node is down

Hello everybody,

I have a 3 kafka nodes cluster

Logstash has a kafka input and output to elasticsearch and syslog (I'm using logstash output isolator pattern). When one kafka node is down (kafka3 in this case), logstash stop to consume/push logs to elasticsearch with the following message :

Group coordinator kafka3:9092 (id: 2147483644 rack: null) is unavailable or invalid, will attempt rediscovery

pipeline conf :

input {
          kafka {
                        topics => ["default.linux"]
                        codec => json
                        bootstrap_servers => "kafka1:9092,kafka2:9092,kafka3:9092"
                        consumer_threads => 2
                        enable_auto_commit => false
                        decorate_events => true
                        ssl_truststore_location => "/etc/logstash/kafka.server.truststore.jks"
                        ssl_truststore_password => "123"
                        ssl_truststore_type => "JKS"
                        security_protocol => "SSL"
                        }
        }
output { pipeline { send_to => [es_filebeat, syslog] } }

When I start the down kafka node no more error message :

|[2021-04-27T18:10:36,988][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator][poc-filebeat2][8dfdf7cf24fb7b5cb09ad368706e1e176d6bf914663fc10e1a5df56283e6de0c] [Consumer clientId=logstash-0, groupId=logstash] Discovered group coordinator kafka3:9092 (id: 2147483644 rack: null)||||||
|---|---|---|---|---|---|
|[2021-04-27T18:10:36,987][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator][poc-filebeat2][8dfdf7cf24fb7b5cb09ad368706e1e176d6bf914663fc10e1a5df56283e6de0c] [Consumer clientId=logstash-1, groupId=logstash] Member logstash-1-e671c77f-6c8c-4606-9038-feecbe6b3bf5 sending LeaveGroup request to coordinator kafka3:9092 (id: 2147483644 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.||||||
|[2021-04-27T18:10:36,991][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator][poc-filebeat2][8dfdf7cf24fb7b5cb09ad368706e1e176d6bf914663fc10e1a5df56283e6de0c] [Consumer clientId=logstash-1, groupId=logstash] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group||||||
|[2021-04-27T18:10:36,992][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator][poc-filebeat2][8dfdf7cf24fb7b5cb09ad368706e1e176d6bf914663fc10e1a5df56283e6de0c] [Consumer clientId=logstash-1, groupId=logstash] Lost previously assigned partitions default.linux-2, default.linux-3||||||
|[2021-04-27T18:10:36,993][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator][poc-filebeat2][8dfdf7cf24fb7b5cb09ad368706e1e176d6bf914663fc10e1a5df56283e6de0c] [Consumer clientId=logstash-1, groupId=logstash] (Re-)joining group||||||

But I didn't expected that one kafka node down would have such impact...

Any idea on how to tell to logstash to continue even if one kakfa node is down ?

Thanks ! :slight_smile:

Hello Travis,

I don't think this is a LogStash problem. Can you please check which replication factor your topic uses?

I am guessing that your topic has replication factor of 1 which means no replication to other nodes. In this case you cannot access the data as it is only stored on this kafka3.

Best regards
Wolfram

Hello Wolfram,

Nope. I use a replication factor of 3 :

[root@kafka1 bin]# ./kafka-topics.sh --zookeeper localhost:2181 --topic default.linux --describe
Topic: default.linux    PartitionCount: 4       ReplicationFactor: 3    Configs:
        Topic: default.linux    Partition: 0    Leader: 2       Replicas: 2,1,3 Isr: 2,1,3
        Topic: default.linux    Partition: 1    Leader: 2       Replicas: 3,2,1 Isr: 2,1,3
        Topic: default.linux    Partition: 2    Leader: 1       Replicas: 1,3,2 Isr: 2,1,3
        Topic: default.linux    Partition: 3    Leader: 2       Replicas: 2,3,1 Isr: 2,1,3

I see...

How did you setup your cluster? Did you start with 3 nodes or did you start with one node and added more nodes afterwards?

What is offsets.topic.replication.factor set to in your broker config?

I am still guessing it has something to do with the replication factor - maybe the consumer offset topic is not mirrored in other nodes...

Yes I start with 3 nodes

I dindn't changed offsets.topic.replication.factor, so I must have the default setting which is 3 when reading the documentation but I should verifiy it maybe ?

Yep good idea concerning consumer offset topic. Here is the output :

[root@kafka1 bin]# ./kafka-topics.sh --zookeeper localhost:2181 --topic __consumer_offsets --describe
Topic: __consumer_offsets       PartitionCount: 50      ReplicationFactor: 1    Configs: compression.type=producer,cleanup.policy=compact,segment.bytes=104857600
        Topic: __consumer_offsets       Partition: 0    Leader: 2       Replicas: 2     Isr: 2
        Topic: __consumer_offsets       Partition: 1    Leader: none    Replicas: 3     Isr: 3
        Topic: __consumer_offsets       Partition: 2    Leader: 1       Replicas: 1     Isr: 1
        Topic: __consumer_offsets       Partition: 3    Leader: 2       Replicas: 2     Isr: 2
        Topic: __consumer_offsets       Partition: 4    Leader: none    Replicas: 3     Isr: 3
        Topic: __consumer_offsets       Partition: 5    Leader: 1       Replicas: 1     Isr: 1
        Topic: __consumer_offsets       Partition: 6    Leader: 2       Replicas: 2     Isr: 2
        Topic: __consumer_offsets       Partition: 7    Leader: none    Replicas: 3     Isr: 3
        Topic: __consumer_offsets       Partition: 8    Leader: 1       Replicas: 1     Isr: 1
        Topic: __consumer_offsets       Partition: 9    Leader: 2       Replicas: 2     Isr: 2
        Topic: __consumer_offsets       Partition: 10   Leader: none    Replicas: 3     Isr: 3
        Topic: __consumer_offsets       Partition: 11   Leader: 1       Replicas: 1     Isr: 1
        Topic: __consumer_offsets       Partition: 12   Leader: 2       Replicas: 2     Isr: 2
        Topic: __consumer_offsets       Partition: 13   Leader: none    Replicas: 3     Isr: 3
        Topic: __consumer_offsets       Partition: 14   Leader: 1       Replicas: 1     Isr: 1
        Topic: __consumer_offsets       Partition: 15   Leader: 2       Replicas: 2     Isr: 2
        Topic: __consumer_offsets       Partition: 16   Leader: none    Replicas: 3     Isr: 3
        Topic: __consumer_offsets       Partition: 17   Leader: 1       Replicas: 1     Isr: 1
        Topic: __consumer_offsets       Partition: 18   Leader: 2       Replicas: 2     Isr: 2
        Topic: __consumer_offsets       Partition: 19   Leader: none    Replicas: 3     Isr: 3
        Topic: __consumer_offsets       Partition: 20   Leader: 1       Replicas: 1     Isr: 1
        Topic: __consumer_offsets       Partition: 21   Leader: 2       Replicas: 2     Isr: 2
        Topic: __consumer_offsets       Partition: 22   Leader: none    Replicas: 3     Isr: 3
        Topic: __consumer_offsets       Partition: 23   Leader: 1       Replicas: 1     Isr: 1
        Topic: __consumer_offsets       Partition: 24   Leader: 2       Replicas: 2     Isr: 2
        Topic: __consumer_offsets       Partition: 25   Leader: none    Replicas: 3     Isr: 3
        Topic: __consumer_offsets       Partition: 26   Leader: 1       Replicas: 1     Isr: 1
        Topic: __consumer_offsets       Partition: 27   Leader: 2       Replicas: 2     Isr: 2
        Topic: __consumer_offsets       Partition: 28   Leader: none    Replicas: 3     Isr: 3
        Topic: __consumer_offsets       Partition: 29   Leader: 1       Replicas: 1     Isr: 1
        Topic: __consumer_offsets       Partition: 30   Leader: 2       Replicas: 2     Isr: 2
        Topic: __consumer_offsets       Partition: 31   Leader: none    Replicas: 3     Isr: 3
        Topic: __consumer_offsets       Partition: 32   Leader: 1       Replicas: 1     Isr: 1
        Topic: __consumer_offsets       Partition: 33   Leader: 2       Replicas: 2     Isr: 2
        Topic: __consumer_offsets       Partition: 34   Leader: none    Replicas: 3     Isr: 3
        Topic: __consumer_offsets       Partition: 35   Leader: 1       Replicas: 1     Isr: 1
        Topic: __consumer_offsets       Partition: 36   Leader: 2       Replicas: 2     Isr: 2
        Topic: __consumer_offsets       Partition: 37   Leader: none    Replicas: 3     Isr: 3
        Topic: __consumer_offsets       Partition: 38   Leader: 1       Replicas: 1     Isr: 1
        Topic: __consumer_offsets       Partition: 39   Leader: 2       Replicas: 2     Isr: 2
        Topic: __consumer_offsets       Partition: 40   Leader: none    Replicas: 3     Isr: 3
        Topic: __consumer_offsets       Partition: 41   Leader: 1       Replicas: 1     Isr: 1
        Topic: __consumer_offsets       Partition: 42   Leader: 2       Replicas: 2     Isr: 2
        Topic: __consumer_offsets       Partition: 43   Leader: none    Replicas: 3     Isr: 3
        Topic: __consumer_offsets       Partition: 44   Leader: 1       Replicas: 1     Isr: 1
        Topic: __consumer_offsets       Partition: 45   Leader: 2       Replicas: 2     Isr: 2
        Topic: __consumer_offsets       Partition: 46   Leader: none    Replicas: 3     Isr: 3
        Topic: __consumer_offsets       Partition: 47   Leader: 1       Replicas: 1     Isr: 1
        Topic: __consumer_offsets       Partition: 48   Leader: 2       Replicas: 2     Isr: 2
        Topic: __consumer_offsets       Partition: 49   Leader: none    Replicas: 3     Isr: 3

So I guess that is the reason - every partition which is stored on Kafka3 has no leader assigned as the partition is not replicated.

The solution would be to reassign the partitions as described here:

Does not makes sense for me...Once kafka3 is down, replica has been promoted as leader replacing the leader role of kafka3. As you can see, even if kafka3 is missing, all the partitions have a leader. So for me, all data are there and logstash should consume

What do you mean? Your output for the consumer offsets explicitly states that all partitions stored on Kafka3 have no leader (none):

If there was a leader for this partitions I would expect something like: Leader: kafka2

1 Like

My bad. You have totally right. My default replication factor for offsets.topic.replication.factor was 1.

To solve the issue I did these steps :

  • stop kafka
  • set offsets.topic.replication.factor=3 transaction.state.log.replication.factor=3 transaction.state.log.min.isr=3 in server.properties
  • ./zookeeper-shell.sh localhost:2181 <<< "deleteall /brokers/topics/__consumer_offsets"
  • rm /tmp/kafka-logs/meta.properties
  • start kafka

Thanks a lot Wolfram ! :slight_smile: Next time I will read more clearly the documentation.

One more question : With 3 kafka brokers, if I loose 1 or 2 brokers, do you think that logstash will be impacted ? I know that logstash will be able to consume but is there any side effect ? (like more charge)

As long as the topics are distributed over all kafka nodes LogStash should not be impacted as long as the remaining kafka node is not under too much pressure. Of course, if the last kafka node has lots of documents coming in and out the performance of LogStash reading data from kafka could be impacted too...

1 Like

Understood thank you !

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.