Below is my logstash config file logstash.conf. Lotstash reads logs from Kafka topic 'my_topic' and outputs it to elasticsearch index 'es-index'.
input {
kafka {
bootstrap_servers =>"kafka1.xxx:9092,kafka2.xxx:9092,kafka3.xxx:9092"
topics => ["my_topic"]
codec => "json"
group_id => "logstashgroup"
}
}
output {
elasticsearch {
hosts => ["es1.myhost:9200","es2.myhost:9200","es3.myhost:9200"]
user => "user123"
password => "password"
index => "es-index"
}
}
filter {
json {
source => "message"
skip_on_invalid_json => true
}
}
It worked fine for a few months but recently it started throwing the following errors:
[ERROR][logstash.outputs.elasticsearch] An unknown error occurred sending a bulk request to Elasticsearch. We will retry indefinitely {:error_message=>"Could not read from stream: Corrupt GZIP trailer", :error_class=>"Manticore::StreamClosedException"
and,
[FATAL][logstash.runner ] An unexpected error occurred! {:error=>org.apache.kafka.common.KafkaException: Received exception when fetching the next record from rumbl_log-1. If needed, please seek past the record to continue consumption., :backtrace=>["org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(org/apache/kafka/clients/consumer/internals/Fetcher.java:1469)", "org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1600(org/apache/kafka/clients/consumer/internals/Fetcher.java:1328)",
then,
[ERROR][logstash.javapipeline ] A plugin had an unrecoverable error. Will restart this plugin.
Plugin: <LogStash::Inputs::Kafka codec=><LogStash::Codecs::JSON id=>"json_9a7a9d96-d7be-4292-a3de-67f797d22ab5"
then,
Error: Received exception when fetching the next record from my_topic-1. If needed, please seek past the record to continue consumption.
Exception: Java::OrgApacheKafkaCommon::KafkaException
and then logstash stops
[ERROR][org.logstash.Logstash ] java.lang.IllegalStateException: Logstash stopped processing because of an error: (SystemExit) exit
I'm not sure what's the problem. I couldn't find anything relevant to solve the error. .