Since a few weeks ago, my yellow topic is having problems being consumed, the logstash consumers seem to stop. If i restart logstash, it starts consuming again, but slowly decrease the speed until it stops. I setup a single logstash only with the yellow kafka topic and after some initial processing, it stops after 1 or 2 minutes
i can see via monitoring API that there is no inputs:
$ curl -s http://localhost:9600/_node/stats/pipeline | jq ".pipeline.plugins.inputs"
[]
but i have then defined:
kafka {
topics => 'yellow'
group_id => "yellow"
bootstrap_servers => 'localhost:9092'
codec => 'json'
consumer_threads => 3
client_id => "miaF03"
decorate_events => true
}
The same config for green and red topics work fine, so i suspect some message is causing this.
I do see this in the logstash logs and this is probably the reason for the problem:
Mar 31 18:33:31 miaF03 logstash[21507]: Exception in thread "Ruby-0-Thread-93: /usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-5.1.6/lib/logstash/inputs/kafka.rb:229" java.util.NoSuchElementException
Mar 31 18:33:31 miaF03 logstash[21507]: at org.apache.kafka.common.utils.AbstractIterator.next(org/apache/kafka/common/utils/AbstractIterator.java:52)
Mar 31 18:33:31 miaF03 logstash[21507]: at org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(org/apache/kafka/common/record/MemoryRecords.java:308)
Mar 31 18:33:31 miaF03 logstash[21507]: at org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(org/apache/kafka/common/record/MemoryRecords.java:221)
Mar 31 18:33:31 miaF03 logstash[21507]: at org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(org/apache/kafka/common/utils/AbstractIterator.java:79)
Mar 31 18:33:31 miaF03 logstash[21507]: at org.apache.kafka.common.utils.AbstractIterator.hasNext(org/apache/kafka/common/utils/AbstractIterator.java:45)
Mar 31 18:33:31 miaF03 logstash[21507]: at org.apache.kafka.clients.consumer.internals.Fetcher.parseFetchedData(org/apache/kafka/clients/consumer/internals/Fetcher.java:545)
Mar 31 18:33:31 miaF03 logstash[21507]: at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(org/apache/kafka/clients/consumer/internals/Fetcher.java:354)
Mar 31 18:33:31 miaF03 logstash[21507]: at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(org/apache/kafka/clients/consumer/KafkaConsumer.java:1000)
Mar 31 18:33:31 miaF03 logstash[21507]: at org.apache.kafka.clients.consumer.KafkaConsumer.poll(org/apache/kafka/clients/consumer/KafkaConsumer.java:938)
Mar 31 18:33:31 miaF03 logstash[21507]: at java.lang.reflect.Method.invoke(java/lang/reflect/Method.java:498)
Mar 31 18:33:31 miaF03 logstash[21507]: at RUBY.thread_runner(/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-5.1.6/lib/logstash/inputs/kafka.rb:239)
Mar 31 18:33:31 miaF03 logstash[21507]: at java.lang.Thread.run(java/lang/Thread.java:745)
But no idea how to fix this... maybe some message is generating a problem in the kafka input?
i'm using:
name: "logstash-input-kafka",
version: "6.2.6"
logstash 5.3.0
Debian jessie