Kafka input slow stop consuming

Since a few weeks ago, my yellow topic is having problems being consumed, the logstash consumers seem to stop. If i restart logstash, it starts consuming again, but slowly decrease the speed until it stops. I setup a single logstash only with the yellow kafka topic and after some initial processing, it stops after 1 or 2 minutes

i can see via monitoring API that there is no inputs:

$ curl -s http://localhost:9600/_node/stats/pipeline | jq ".pipeline.plugins.inputs"
[]

but i have then defined:


  kafka { 
    topics => 'yellow'
    group_id => "yellow"
    bootstrap_servers => 'localhost:9092'
    codec => 'json' 
    consumer_threads => 3
    client_id => "miaF03"
    decorate_events => true
  } 

The same config for green and red topics work fine, so i suspect some message is causing this.

I do see this in the logstash logs and this is probably the reason for the problem:

Mar 31 18:33:31 miaF03 logstash[21507]: Exception in thread "Ruby-0-Thread-93: /usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-5.1.6/lib/logstash/inputs/kafka.rb:229" java.util.NoSuchElementException
Mar 31 18:33:31 miaF03 logstash[21507]: at org.apache.kafka.common.utils.AbstractIterator.next(org/apache/kafka/common/utils/AbstractIterator.java:52)
Mar 31 18:33:31 miaF03 logstash[21507]: at org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(org/apache/kafka/common/record/MemoryRecords.java:308)
Mar 31 18:33:31 miaF03 logstash[21507]: at org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(org/apache/kafka/common/record/MemoryRecords.java:221)
Mar 31 18:33:31 miaF03 logstash[21507]: at org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(org/apache/kafka/common/utils/AbstractIterator.java:79)
Mar 31 18:33:31 miaF03 logstash[21507]: at org.apache.kafka.common.utils.AbstractIterator.hasNext(org/apache/kafka/common/utils/AbstractIterator.java:45)
Mar 31 18:33:31 miaF03 logstash[21507]: at org.apache.kafka.clients.consumer.internals.Fetcher.parseFetchedData(org/apache/kafka/clients/consumer/internals/Fetcher.java:545)
Mar 31 18:33:31 miaF03 logstash[21507]: at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(org/apache/kafka/clients/consumer/internals/Fetcher.java:354)
Mar 31 18:33:31 miaF03 logstash[21507]: at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(org/apache/kafka/clients/consumer/KafkaConsumer.java:1000)
Mar 31 18:33:31 miaF03 logstash[21507]: at org.apache.kafka.clients.consumer.KafkaConsumer.poll(org/apache/kafka/clients/consumer/KafkaConsumer.java:938)
Mar 31 18:33:31 miaF03 logstash[21507]: at java.lang.reflect.Method.invoke(java/lang/reflect/Method.java:498)
Mar 31 18:33:31 miaF03 logstash[21507]: at RUBY.thread_runner(/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-5.1.6/lib/logstash/inputs/kafka.rb:239)
Mar 31 18:33:31 miaF03 logstash[21507]: at java.lang.Thread.run(java/lang/Thread.java:745)

But no idea how to fix this... maybe some message is generating a problem in the kafka input?
i'm using:
name: "logstash-input-kafka",
version: "6.2.6"
logstash 5.3.0
Debian jessie

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.