I've noticed that using RabbitMQ (with ACKing) as Input and Elasticsearch as Output in Logstash might fall into situation, when Output is not responsive from some reason (beeing restarted, connection loss), then Logstash still consumes messages from RabbitMQ making them lost.

Is there any possibility to prevent that situation?

My pipeline config:

input {
  rabbitmq {
    id => "rabbit01"
    host => "${RABBITMQ_HOST}"
    port => "${RABBITMQ_PORT:5672}"
    user => "${RABBITMQ_USER}"
    password => "${RABBITMQ_PASSWORD}"
    passive => true
    queue => "${RABBITMQ_QUEUE_NAME_SAVE}"

output {
  elasticsearch {
    hosts => ["${ELASTICSEARCH_URI}"]
    index => "my_index"
    document_id => "%{[gid]}"
    doc_as_upsert => true

Plugins lists:

bash-4.2$ bin/logstash-plugin list --verbose | grep -E 'elastic|rabbit'
logstash-filter-elasticsearch (3.6.0)
logstash-input-elasticsearch (4.3.2)
logstash-input-rabbitmq (6.0.3)
logstash-output-elastic_app_search (1.0.0)
logstash-output-elasticsearch (10.1.0)
logstash-output-rabbitmq (5.1.1)

Persistent queues might help.

