Kafka-logstash-elastic disaster resistant

I am testing disaster resistant of pipeline kafka->logstash->elastic
Logstash config

input {
  kafka {
    bootstrap_servers => ","
    topics => "test-test"
    #consumer_threads => 2
    auto_offset_reset => "earliest"
    client_id => "logstash-1"
    group_id => "logstash-0"
    security_protocol => "SSL"
    ssl_keystore_location => "/opt/logstash/config/server.keystore.p12"
    ssl_keystore_password => "new_pass"
    ssl_key_password => "new_pass"
    ssl_truststore_location => "/opt/logstash/config/kafka-cluster.truststore.jks"
    ssl_truststore_password => "new_pass"
    ssl_endpoint_identification_algorithm => ""

output {
  elasticsearch {
    hosts => ["", ""]
    ssl => true
    cacert => "/opt/logstash/config/elasticsearch-ca.pem"
    user => logstash_internal
    password => "new_pass"
    index => "test-replication"
  #stdout { codec => rubydebug }

And I found the scenario in which data will not be written to Elastic.
When I launch Logstash and Kafka. After some time I kill Logstash (like crash) And after some time launch Logstash again and Elastic. Data which was loaded to Logstash before its crash will not be loaded to Elastic.

Is there a way to load this data to Elastic? Maybe reset offset in Kafka to last written to Elastic. Or maybe save this data to a disk, and when Logstash runs again send it to Elastic and erase?
I didn't find these options in plugins.
The better type of solution for me is to stop loading data from Kafka, when Elastic is unreachable.


Which scenario are you testing? Logstash failure or Elasticsearch failure?

Logstash per default has an in-memory queue to buffer the events, in the case of an output failure, it will buffer the events until the queue is full and then it will apply black pressure to the input, each input responds different to this, if I'm not wrong if you are using the kafka input, it will stop consuming from kafka until the output is back and the queue free some space.

You can also use persistent queues which will buffer the events on disk instead of in-memory, it deals with output failure the same way as the in-memory queue.

In the case of a sudden crash of the Logstash service, both types of queue can lead to data loss, the in-memory queue will lose everything, but the persistent queue will lose only the data that was not commited to the checkpoint file.

About the Kafka input, logstash will commit the offset when it consume a batch of messages, not when it is indexed in Elasticsearch, so if logstash crashes before those messages are indexed, them you will need to reset the offset in Kafka to the last offset consumed.

To reset the offsets in Kafka you will need to stop the logstash that are consuming from it and follow the kafka documentation.

To know what was the last offset you will need to enable the option decorate_events => true in your logstash kafka input and save the field [@metadata][kafka][offset] in a field of your document using mutate.

Something like this:

mutate {
    add_field => { "[kafkaOffset]" => "%{[@metadata][kafka][offset]}" }

In the second part I stop Elastic, in 3d part I kill logstash. And data from part 2 isn't written to Elastic at the end.

Thanks very much. I'll try your advice.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.