I want to have multiple logstash reading from a single kafka topic. The log message in a kafka topic should be read by only one of the logstash instances. I could not find any doc related to this.
I have some doubts regarding this deployment:-
Let say we have a kafka topic named logstash_logs with three partitions.
Doubt 1
If the partitions are placed in a same kafka broker machine for now. Is the below logstash input config correct for all the logstash instances:-
What is the correct no. of consumer_threads to add? If I have 3 logstash instances reading from the kafka topic. Then is consumer_threads => 1 in each logstash input config good number or I should add consumer_threads => 3 in each machine. (3 is the no. of partitions in the topic). What if I have only two logstash instances reading from the kafka topic?
Your config is correct for your first concern. The Kafka input uses Kafka's High Level Consumer so for in consumer group, designated by group_id, each partition always get a thread to read it and none more. If you have more threads than partitions, some threads won't have anything to do. If you have more partitions than threads, some threads will have more than one partition that they work on. Also with more partitions than threads the Kafka client uses a single fetcher which will fetch messages from all the topic partitions.
For the second question, a simple formula to figure out the number of consumer threads is to take the number of instances (I) and divide by the number of partitions (P) and then take the floor of that value and that is the maximum number of consumer_threads (T) you should have. So T = floor(I/P). In your case, you have three instances and three topics so 1 is correct.
If you are running on EC2, you can put your logstash consumers in an autoscaling group and scale by CPU consumption, up to three instances, minimum one and then your logstash cluster will size as the load fluctuates and distribute the partitions appropriately. So at peak load you would have three instances working on the logs, each with their own partitions and at trough times you would just have one instance consuming all the partitions.
Hi Joe,
Is there a typo to calculate consumer threads for each log instance. It should be T = floor(P/I), not T = floor(I/P)?
I'm using logstash2.1(logstash-2.1.1-1.noarch.rmp). I hit a issue about the balancing between consumer threads of multiple logstash instance. I have 1 topic with 24 partitions, 3 logstash instance to consume this topic. The kafka input config on each logstash instance is:
However, I found the topic was consumed not evenly from Kafka:
$./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group logstash --topic test
Group Topic Pid Offset logSize Lag Owner
logstash test 0 15915 16401 486 host1
logstash test 1 17535 20657 3122 host1
logstash test 2 9833 9833 0 host1
logstash test 3 12822 12871 49 host1
logstash test 4 7599 7705 106 host1
logstash test 5 18055 18057 2 host1
logstash test 6 14273 14395 122 host1
logstash test 7 15870 16292 422 host1
logstash test 8 2211 2211 0 host2
logstash test 9 4442 4442 0 host2
logstash test 10 8854 8854 0 host2
logstash test 11 5618 5618 0 host2
logstash test 12 4460 4460 0 host2
logstash test 13 6607 6607 0 host2
logstash test 14 4432 4432 0 host2
logstash test 15 8882 8882 0 host2
logstash test 16 4536 6347 1811 host2
logstash test 17 8867 41720 32853 host2
logstash test 18 5420 10002 4582 host2
logstash test 19 5106 8267 3161 host2
logstash test 20 18694 18694 0 host3
logstash test 21 9054 9054 0 host3
logstash test 22 4444 4444 0 host3
logstash test 23 4400 4402 2 host3
host1 consumed 8 partitions, but host2 consumed 12 partitions, it should be 8 partitions. So host3 could only consume 4 partitions.
We could also see that the additional 4 partitions consumed by host2 have many messages lagged behind.
Any idea to balance consumers across multiple hosts?
Kenny set consumer threads to 1. Even with 3 hosts you are balancing three
groups with 24 threads so that's why you get 8-8-4. That should also reduce
thread contention. Sorry for the slow response.
I see you don't have a group declared in your input config. Is the group_id not required for input plugin and multiple instances will still round robin the consumption for a single topic?
Where is the 3 groups come into the picture? I thought the 3 instances are all in one group. And why does setting the consumer threads to 1 help? Does it just make 1 thread reading from more partitions?
Each 'consumer_thread' is acting as a member of the consumer group. So for each system Logstash was configured to have 8 consumers in the group. The partitions can be distributed amongst those groups as the rebalance algorithm decides. Frankly looking back it's strange that it ended up being a 8-12-4 split but that's how it balanced out. Generally the names are something other than host1...3 and more like hostname-random-hash sonit may have been a 8-6-6-4 split. I dunno.
So to answer your question it was always one 'group' but three hosts with a maximum of 8 consumers in the group per host. The consumers themselves will divy up all 24 partitions with the rebalance algorithm (which I don't know off the top of my head). So occasionally you can end up with an unequal number of partitions per consumer group thread.
As for your second question, Why is 1 'consumer_thread' as effective as multiple, the thread is doing nothing but reading from Kafka threads and placing the messages onto a queue. Kafka is already making a thread per partition underneath the consumer_thread so you really end up with a thread that says, for all the partition threads underneath my consumer thread, push messages to the queue in my consumer_thread which the pushes the message into the Logstash thread. Kafka is already threading for each queue so all the parallelism is already capitalized upon via Kafka's implementation. Adding additional consumer_threads just makes an extra middle man between the Logstash queue and the Kafka consumer threads.
The jruby-kafka consumer threads are the 'consumer_thread' in the config. So because it's just copying from Kafka consumer threads to the jruby-kafka consumer thread and then into the Logstash input queue thread there isn't much efficiency gained by increasing that number of middle threads as Kafka is already natively threading efficiently.
I typed all this on a phone so I hope it makes sense. If it doesn't let me know and I'll give it another shot on my laptop.
Apache, Apache Lucene, Apache Hadoop, Hadoop, HDFS and the yellow elephant
logo are trademarks of the
Apache Software Foundation
in the United States and/or other countries.