Multiple logstash reading from a single kafka topic

I want to have multiple logstash reading from a single kafka topic. The log message in a kafka topic should be read by only one of the logstash instances. I could not find any doc related to this.

I have some doubts regarding this deployment:-

Let say we have a kafka topic named logstash_logs with three partitions.

Doubt 1


If the partitions are placed in a same kafka broker machine for now. Is the below logstash input config correct for all the logstash instances:-

input {
    kafka {
        zk_connect => "kafka:2181"
        group_id => "logstash"
        topic_id => "logstash_logs"
        reset_beginning => false
        consumer_threads => 1
    }
}

Doubt 2


What is the correct no. of consumer_threads to add? If I have 3 logstash instances reading from the kafka topic. Then is consumer_threads => 1 in each logstash input config good number or I should add consumer_threads => 3 in each machine. (3 is the no. of partitions in the topic). What if I have only two logstash instances reading from the kafka topic?

2 Likes

Your config is correct for your first concern. The Kafka input uses Kafka's High Level Consumer so for in consumer group, designated by group_id, each partition always get a thread to read it and none more. If you have more threads than partitions, some threads won't have anything to do. If you have more partitions than threads, some threads will have more than one partition that they work on. Also with more partitions than threads the Kafka client uses a single fetcher which will fetch messages from all the topic partitions.

For the second question, a simple formula to figure out the number of consumer threads is to take the number of instances (I) and divide by the number of partitions (P) and then take the floor of that value and that is the maximum number of consumer_threads (T) you should have. So T = floor(I/P). In your case, you have three instances and three topics so 1 is correct.

If you are running on EC2, you can put your logstash consumers in an autoscaling group and scale by CPU consumption, up to three instances, minimum one and then your logstash cluster will size as the load fluctuates and distribute the partitions appropriately. So at peak load you would have three instances working on the logs, each with their own partitions and at trough times you would just have one instance consuming all the partitions.

3 Likes

Thanks again Joe.

Hi Joe,
Is there a typo to calculate consumer threads for each log instance. It should be T = floor(P/I), not T = floor(I/P)?

I'm using logstash2.1(logstash-2.1.1-1.noarch.rmp). I hit a issue about the balancing between consumer threads of multiple logstash instance. I have 1 topic with 24 partitions, 3 logstash instance to consume this topic. The kafka input config on each logstash instance is:

input {
    kafka {
        zk_connect => "x.x.x.x:2181, y.y.y.y:2181, z.z.z.z:2181"
        topic_id => "test"
        consumer_threads => 8
        fetch_message_max_bytes => 20971520
    }
}

However, I found the topic was consumed not evenly from Kafka:

$./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group logstash --topic test
Group           Topic                          Pid Offset          logSize         Lag             Owner
logstash        test                           0   15915           16401           486             host1
logstash        test                           1   17535           20657           3122            host1
logstash        test                           2   9833            9833            0               host1
logstash        test                           3   12822           12871           49              host1
logstash        test                           4   7599            7705            106             host1
logstash        test                           5   18055           18057           2               host1
logstash        test                           6   14273           14395           122             host1
logstash        test                           7   15870           16292           422             host1
logstash        test                           8   2211            2211            0               host2
logstash        test                           9   4442            4442            0               host2
logstash        test                           10  8854            8854            0               host2
logstash        test                           11  5618            5618            0               host2
logstash        test                           12  4460            4460            0               host2
logstash        test                           13  6607            6607            0               host2
logstash        test                           14  4432            4432            0               host2
logstash        test                           15  8882            8882            0               host2
logstash        test                           16  4536            6347            1811            host2
logstash        test                           17  8867            41720           32853           host2
logstash        test                           18  5420            10002           4582            host2
logstash        test                           19  5106            8267            3161            host2
logstash        test                           20  18694           18694           0               host3
logstash        test                           21  9054            9054            0               host3
logstash        test                           22  4444            4444            0               host3
logstash        test                           23  4400            4402            2               host3

host1 consumed 8 partitions, but host2 consumed 12 partitions, it should be 8 partitions. So host3 could only consume 4 partitions.
We could also see that the additional 4 partitions consumed by host2 have many messages lagged behind.

Any idea to balance consumers across multiple hosts?

Thanks,
Kenny

1 Like

Kenny set consumer threads to 1. Even with 3 hosts you are balancing three
groups with 24 threads so that's why you get 8-8-4. That should also reduce
thread contention. Sorry for the slow response.

1 Like

Thanks, Joe.

I see you don't have a group declared in your input config. Is the group_id not required for input plugin and multiple instances will still round robin the consumption for a single topic?

1 Like

The group id by default is "logstash" so no worries there.
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html#plugins-inputs-kafka-group_id

2 Likes

Where is the 3 groups come into the picture? I thought the 3 instances are all in one group. And why does setting the consumer threads to 1 help? Does it just make 1 thread reading from more partitions?

Each 'consumer_thread' is acting as a member of the consumer group. So for each system Logstash was configured to have 8 consumers in the group. The partitions can be distributed amongst those groups as the rebalance algorithm decides. Frankly looking back it's strange that it ended up being a 8-12-4 split but that's how it balanced out. Generally the names are something other than host1...3 and more like hostname-random-hash sonit may have been a 8-6-6-4 split. I dunno.

So to answer your question it was always one 'group' but three hosts with a maximum of 8 consumers in the group per host. The consumers themselves will divy up all 24 partitions with the rebalance algorithm (which I don't know off the top of my head). So occasionally you can end up with an unequal number of partitions per consumer group thread.

As for your second question, Why is 1 'consumer_thread' as effective as multiple, the thread is doing nothing but reading from Kafka threads and placing the messages onto a queue. Kafka is already making a thread per partition underneath the consumer_thread so you really end up with a thread that says, for all the partition threads underneath my consumer thread, push messages to the queue in my consumer_thread which the pushes the message into the Logstash thread. Kafka is already threading for each queue so all the parallelism is already capitalized upon via Kafka's implementation. Adding additional consumer_threads just makes an extra middle man between the Logstash queue and the Kafka consumer threads.

So you get

[Kafka consumer thread 0] --
[Kafka consumer thread 1]----->jruby-kafka consumer thread -> Logstash input thread
[Kafka cobsumer thread 2]---/
...
[Kafka consumer thread 23]-^

The jruby-kafka consumer threads are the 'consumer_thread' in the config. So because it's just copying from Kafka consumer threads to the jruby-kafka consumer thread and then into the Logstash input queue thread there isn't much efficiency gained by increasing that number of middle threads as Kafka is already natively threading efficiently.

I typed all this on a phone so I hope it makes sense. If it doesn't let me know and I'll give it another shot on my laptop.

3 Likes