Kafka input not resuming where it stopped


(Mihir Ray) #1

HI,

I have a kafka cluster from where i am reading data and doing my processing. The input looks like:

input {
kafka {
topic_id => "sb_logs"
codec => "plain"
zk_connect => "cpanaetl01.sling.com:2181,cpanaetl02.sling.com:2181,cpanaetl03.sling.com:2181"
consumer_threads => 20
}
}

If i stop logstash for few hours and start again, it reads only the recent data not resuming where it stopped.
Is there anything wrong with my config?


(Joe Lawson) #2

First thing, put the thread number down to 1. Kafka I'd already making
threads per partition so there is no parallelism to be gained. We should
remove that option from the plugin. Also set your consumer group name
otherwise it'll assign a rand one each time which is why it isn't resuming.

Hope this helps!

Sincerely,

Joe


(Mihir Ray) #3

That makes sense. Thanks.
Please put this in your documentation as it says by default it will set the group name as logstash.


(Joe Lawson) #4

You know, I was wrong about the group it is in fact logstash by default so it is strange that it wouldn't resume. Do you have other logstash consumers running with the same group?


(Mihir Ray) #5

Yes, i have multiple consumers on the same group but for different topics.
Sometimes i have seen, the logstash consumers do not make a offset entry in the zookeeper.


(Mihir Ray) #6

HI,
I did a validation on multi threading.

When not using consumer_threads parameter :

/opt/kafka/kafka_2.11-0.8.2.1/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic test_log --group analytics_test_log --zookeeper etl01.abc.com:2181,etl02.abc.com:2181,etl03.abc.com:2181
Group Topic Pid Offset logSize Lag Owner
analytics_test_log test_log 0 198264791 199599448 1334657 analytics_test_log_etl03.abc.com-1444631984622-cfd962f7-0
analytics_test_log test_log 1 167470850 168806166 1335316 analytics_test_log_etl03.abc.com-1444631984622-cfd962f7-0
analytics_test_log test_log 2 190270643 192428588 2157945 analytics_test_log_etl03.abc.com-1444631984622-cfd962f7-0
analytics_test_log test_log 3 183026805 185389546 2362741 analytics_test_log_etl03.abc.com-1444631984622-cfd962f7-0
analytics_test_log test_log 4 174819301 177646047 2826746 analytics_test_log_etl03.abc.com-1444631984622-cfd962f7-0
analytics_test_log test_log 5 188196243 189569061 1372818 analytics_test_log_etl03.abc.com-1444631984622-cfd962f7-0
analytics_test_log test_log 6 174346801 178962394 4615593 analytics_test_log_etl03.abc.com-1444631984622-cfd962f7-0
analytics_test_log test_log 7 171562338 172310342 748004 analytics_test_log_etl03.abc.com-1444631984622-cfd962f7-0
analytics_test_log test_log 8 180201753 185593524 5391771 analytics_test_log_etl03.abc.com-1444631984622-cfd962f7-0
analytics_test_log test_log 9 182661181 184789707 2128526 analytics_test_log_etl03.abc.com-1444631984622-cfd962f7-0

When using cunsumer_threads:

/opt/kafka/kafka_2.11-0.8.2.1/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic test_log --group analytics_test_log --zookeeper etl01.abc.com:2181,etl02.abc.com:2181,etl03.abc.com:2181
Group Topic Pid Offset logSize Lag Owner
analytics_test_log test_log 0 198347704 199599448 1251744 analytics_test_log_etl03.abc.com-1444632839752-e081907b-0
analytics_test_log test_log 1 167559379 168806166 1246787 analytics_test_log_etl03.abc.com-1444632839752-e081907b-1
analytics_test_log test_log 2 190354993 192428588 2073595 analytics_test_log_etl03.abc.com-1444632839752-e081907b-2
analytics_test_log test_log 3 183109216 186047539 2938323 analytics_test_log_etl03.abc.com-1444632839752-e081907b-3
analytics_test_log test_log 4 174903832 177646047 2742215 analytics_test_log_etl03.abc.com-1444632839752-e081907b-4
analytics_test_log test_log 5 188281063 189569061 1287998 analytics_test_log_etl03.abc.com-1444632839752-e081907b-5
analytics_test_log test_log 6 174432717 178962394 4529677 analytics_test_log_etl03.abc.com-1444632839752-e081907b-6
analytics_test_log test_log 7 171649245 173122454 1473209 analytics_test_log_etl03.abc.com-1444632839752-e081907b-7
analytics_test_log test_log 8 180285840 186082448 5796608 analytics_test_log_etl03.abc.com-1444632839752-e081907b-8
analytics_test_log test_log 9 182747656 184789707 2042051 analytics_test_log_etl03.abc.com-1444632839752-e081907b-9

If you see both the outputs, in the first case all the owner suffixes are same(0), in the second one it has suffixes from 1 to 10. Looks like logstash is not doing multi threading by default.

Please let me know if its not the case.

Thanks


(Joe Lawson) #7

This is getting a little off topic but the tldr; is that the underlying
jruby-kafka consumer thread isn't doing anything but multiplexing from the
reader which creates a consumer thread per partition into the queue that
logstash passes in. So yes there are more Kafka streams but each one isn't
adding anything. Check out discussion here:

When the latest major version of jruby-kafka is released then the
logstash-kafka-input will need to change to pass a process into the Kafka
stream which could add parallelism however even then it probably won't help
much because logstash has a serialized processing chain, ie input
queue>filter queue> output queue. If logstash were more like spark and
maintained parallel threads for each chain then bumping the number of Kafka
streams would help.


(system) #8