Logstash not stopping if elasticsearch down - although persistent pipeline queue is enabled

Hi,

I am using multi-pipelining in logstash. I encountered issues before when I want to stop logstash and elasticsearch is unavailable.
Thought the issue was the following:

  • logstash has read events from redis and processed it and put the processed items to commonOut pipeline of logstash.
  • This common-out-pipeline got stuck, when elasticsearch is unavailable.

To mitigate this issue I thought it is the right approach to enable persistent queueing inside logstash. So I enabled persistence for all queues. I can see the queue files in file system.

But when I first stop elasticsearch and then I want to stop logstash I am still stuck.

[2022-11-08T10:18:54,406][INFO ][logstash.outputs.elasticsearchmonitoring][.monitoring-logstash] Failed to perform request {:message=>"Connect to elastic01.example.com:9200 [elastic01.example.com/xxx.xxx.xxx.xxx] failed: Connection refused", :exception=>Manticore::SocketException, :cause=>#<Java::OrgApacheHttpConn::HttpHostConnectException: Connect to elastic01.example.com:9200 [elastic01.example.com/xxx.xxx.xxx.xxx] failed: Connection refused>}
[2022-11-08T10:18:54,407][WARN ][logstash.outputs.elasticsearchmonitoring][.monitoring-logstash] Attempted to resurrect connection to dead ES instance, but got an error {:url=>"https://logstash_system:xxxxxx@elastic01.example.com:9200/", :exception=>LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError, :message=>"Elasticsearch Unreachable: [https://elastic01.example.com:9200/][Manticore::SocketException] Connect to elastic01.example.com:9200 [elastic01.example.com/xxx.xxx.xxx.xxx] failed: Connection refused"}
[2022-11-08T10:18:54,647][INFO ][logstash.outputs.elasticsearch][commonOutElasticsearch] Failed to perform request {:message=>"Connect to elastic01.example.com:9200 [elastic01.example.com/xxx.xxx.xxx.xxx] failed: Connection refused", :exception=>Manticore::SocketException, :cause=>#<Java::OrgApacheHttpConn::HttpHostConnectException: Connect to elastic01.example.com:9200 [elastic01.example.com/xxx.xxx.xxx.xxx] failed: Connection refused>}
[2022-11-08T10:18:54,648][WARN ][logstash.outputs.elasticsearch][commonOutElasticsearch] Attempted to resurrect connection to dead ES instance, but got an error {:url=>"https://logstash_ingest:xxxxxx@elastic01.example.com:9200/", :exception=>LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError, :message=>"Elasticsearch Unreachable: [https://elastic01.example.com:9200/][Manticore::SocketException] Connect to elastic01.example.com:9200 [elastic01.example.com/xxx.xxx.xxx.xxx] failed: Connection refused"}
[2022-11-08T10:18:55,266][WARN ][org.logstash.execution.ShutdownWatcherExt] {"inflight_count"=>0, "stalling_threads_info"=>{"other"=>[{"thread_id"=>68, "name"=>"[commonOutElasticsearch]>worker0", "current_call"=>"[...]/vendor/bundle/jruby/2.6.0/gems/stud-0.0.23/lib/stud/interval.rb:89:in `sleep'"}, {"thread_id"=>69, "name"=>"[commonOutElasticsearch]>worker1", "current_call"=>"[...]/vendor/bundle/jruby/2.6.0/gems/stud-0.0.23/lib/stud/interval.rb:89:in `sleep'"}, {"thread_id"=>70, "name"=>"[commonOutElasticsearch]>worker2", "current_call"=>"[...]/vendor/bundle/jruby/2.6.0/gems/stud-0.0.23/lib/stud/interval.rb:89:in `sleep'"}, {"thread_id"=>71, "name"=>"[commonOutElasticsearch]>worker3", "current_call"=>"[...]/vendor/bundle/jruby/2.6.0/gems/stud-0.0.23/lib/stud/interval.rb:89:in `sleep'"}, {"thread_id"=>72, "name"=>"[commonOutElasticsearch]>worker4", "current_call"=>"[...]/vendor/bundle/jruby/2.6.0/gems/stud-0.0.23/lib/stud/interval.rb:89:in `sleep'"}, {"thread_id"=>73, "name"=>"[commonOutElasticsearch]>worker5", "current_call"=>"[...]/vendor/bundle/jruby/2.6.0/gems/stud-0.0.23/lib/stud/interval.rb:89:in `sleep'"}, {"thread_id"=>74, "name"=>"[commonOutElasticsearch]>worker6", "current_call"=>"[...]/vendor/bundle/jruby/2.6.0/gems/stud-0.0.23/lib/stud/interval.rb:89:in `sleep'"}, {"thread_id"=>76, "name"=>"[commonOutElasticsearch]>worker7", "current_call"=>"[...]/vendor/bundle/jruby/2.6.0/gems/stud-0.0.23/lib/stud/interval.rb:89:in `sleep'"}]}}

My pipeline.yml looks like this:



- pipeline.id: generic_json
  pipeline.workers: 8
  queue.type: persisted
  queue.max_bytes: ${LOGSTASH_QUEUE_MAX_BYTES_FILTER:100mb}
  path.config: "/etc/logstash/config_sets/all/0/pipelines/generic_json"

- pipeline.id: metricbeat
  pipeline.workers: 8
  queue.type: persisted
  queue.max_bytes: ${LOGSTASH_QUEUE_MAX_BYTES_FILTER:100mb}
  path.config: "/etc/logstash/config_sets/all/0/pipelines/metricbeat"


- pipeline.id: commonOutElasticsearch
  pipeline.workers: 8
  queue.type: persisted
  queue.max_bytes: ${LOGSTASH_QUEUE_MAX_BYTES_OUTPUT:1gb}
  path.config: "/etc/logstash/config_sets/all/0/pipelines/output_es_common_https"

(first pipelines are reading their specific redis key and do some filtering processing. Then both pipelines are sending to pipeline commonOutElasticsearch)

The output section of the commonOutElasticsearch looks like this:

output
{
  elasticsearch
  {
    hosts           => "${ES_HOSTS}"
    ssl             => "${USE_ES_SSL}"
    cacert          => "${ES_CA_CERT_PATH}"
    ssl_certificate_verification    =>      "${USE_ES_OUTPUT_SSL_CERT_VERIFICATION}"

    # credentials are fetched from envrionment or logstash-keystore
    user            => "${LOGSTASH_USER}"
    password        => "${LOGSTASH_PASSWORD}"

    action   => create
    pipeline => "plx_pipeline"

  }
}

My expectation was while introducing the persisted queue:

  • logstash will read event from it's queue
  • it will process the event.
  • if output is available it sends the processed event to output and then deletes it from queue.
  • if output is unavailable and logstash is going to shutdown, it will drop the processed event which is in memory. The original data is still in queue, so it will not be lost when shutdown logstash. On next restart the event will fetched and processed again from the queue.

So what is the right approach to be able to stop logstash at any time WITHOUT DATA LOSS, even if elasticsearch / the output is unavailable?

Thanks, Andreas

Bump. Can anyone help?

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.