Error pushing logs to elasticsearch using kafka as input for logstash

Below is my logstash config file logstash.conf. Lotstash reads logs from Kafka topic 'my_topic' and outputs it to elasticsearch index 'es-index'.

input {
kafka {
bootstrap_servers =>"kafka1.xxx:9092,kafka2.xxx:9092,kafka3.xxx:9092"
topics => ["my_topic"]
codec => "json"
group_id => "logstashgroup"
}
}
output {
elasticsearch {
hosts => ["es1.myhost:9200","es2.myhost:9200","es3.myhost:9200"]
user => "user123"
password => "password"
index => "es-index"
}
}
filter {
json {
source => "message"
skip_on_invalid_json => true
}
}

It worked fine for a few months but recently it started throwing the following errors:

[ERROR][logstash.outputs.elasticsearch] An unknown error occurred sending a bulk request to Elasticsearch. We will retry indefinitely {:error_message=>"Could not read from stream: Corrupt GZIP trailer", :error_class=>"Manticore::StreamClosedException"

and,

[FATAL][logstash.runner ] An unexpected error occurred! {:error=>org.apache.kafka.common.KafkaException: Received exception when fetching the next record from rumbl_log-1. If needed, please seek past the record to continue consumption., :backtrace=>["org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(org/apache/kafka/clients/consumer/internals/Fetcher.java:1469)", "org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1600(org/apache/kafka/clients/consumer/internals/Fetcher.java:1328)",

then,

[ERROR][logstash.javapipeline ] A plugin had an unrecoverable error. Will restart this plugin.
Plugin: <LogStash::Inputs::Kafka codec=><LogStash::Codecs::JSON id=>"json_9a7a9d96-d7be-4292-a3de-67f797d22ab5"

then,

Error: Received exception when fetching the next record from my_topic-1. If needed, please seek past the record to continue consumption.
Exception: Java::OrgApacheKafkaCommon::KafkaException

and then logstash stops

[ERROR][org.logstash.Logstash ] java.lang.IllegalStateException: Logstash stopped processing because of an error: (SystemExit) exit

I'm not sure what's the problem. I couldn't find anything relevant to solve the error. .

It seems that one of the kafka event in the partition "rumbl_log-1" is corrupted.
You should change the offset ( +1 ) to solve this issue - bypass this corrupted event

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.