Replaying Dead Letter messages via RabbitMQ

Logstash and Elasticion version: 5.6.9

Setting up multiple pipelines so that any messages that end up in Logstash's Dead Letter get sent back to RabbitMQ so that we can re-process them after sorting out any potential mapping issues (Further to Logstash Dead Letter Messages back to RabbitMQ)

Defined the following pipelines in Logstash:

########### Actual pipeline sending to Elastic ###########

input {
  rabbitmq {
    host => "rabbitmq"
    port => 5672
    user => "test"
    password => "test"
    vhost => "test"
    queue => "test.queue"
    codec => "json"
    metadata_enabled => true
    passive => true
    ack => true
    prefetch_count => 1000
    threads => 5
    tags => "audit"
    heartbeat => "5"
    }
}

filter {
  if "audit" in [tags] {
    mutate {
      rename => { "messageTemplate" => "message" }
      remove_field => [ "@version" ]
      remove_field => [ "[fields][PerformanceType]" ]
   }
    if ![target_index] {
      mutate {
        add_field => { "target_index" => "%{[@metadata][rabbitmq_headers][x-target-index]}" }
      }
    }
  }
}


output {
  if "audit" in [tags] {
     elasticsearch {
        hosts => ["elastic01","elastic02","elastic03"]
        index => "%{target_index}"
        template => "/etc/logstash/templates/template1"
        template_name => "audit"
     }
  }
}


####### Dead Letter Pipeline to send to RabbitMQ #########

input {
  dead_letter_queue {
    path => "/opt/logstash/dead_letter_queue"
    commit_offsets => true
    sincedb_path => "/opt/logstash/dead_letter_queue/sincedb"
    tags => "dlx"
  }
}

filter {
  if "dlx" in [tags] {
        if ![rabbitmq.dlx.retry.count] {
                mutate { add_field => {"rabbitmq.dlx.retry.count" => "0" } }
                mutate { convert => { "rabbitmq.dlx.retry.count" => "integer" } }
        } else {
                ruby { code => 'event.set("rabbitmq.dlx.retry.count", event.get("rabbitmq.dlx.retry.count").to_i + 1)' }
        }
   if ![rabbitmq.dlx.reason] {
      mutate { add_field => { "rabbitmq.dlx.reason" => "%{[@metadata][dead_letter_queue][reason]}" } }
   } else {
      mutate { update => { "rabbitmq.dlx.reason" => "%{[@metadata][dead_letter_queue][reason]}" } }
   }
  }
}

output {
  if "dlx" in [tags] {
  rabbitmq {
    host => "rabbitmq"
    port => 5672
    user => "test"
    password => "test"
    vhost => "test"
    exchange => "test.dlx.exchange"
    exchange_type => "fanout"
    codec => "json"
    heartbeat => "5"
  }
  }
}



####### Healing Pipeline to sort out any mapping errors. No Output defined here as the removal of the dlx tag implies that the message goes through Output with audit in tags #########

input {
  rabbitmq {
    host => "rabbitmq"
    port => 5672
    user => "test"
    password => "test"
    vhost => "test"
    queue => "test.heal.queue"
    codec => "json"
    metadata_enabled => true
    passive => true
    ack => true
    prefetch_count => 1000
    threads => 5
    tags => "audit_heal"
    heartbeat => "5"
    }
}

filter {
  if "audit_heal" in [tags] {
    mutate {
      rename => { "messageTemplate" => "message" }
      remove_field => [ "@version" ]
      remove_field => [ "[fields][PerformanceType]" ]
      remove_tag => [ "dlx" ]
   }
  }
}

In a nutshell, any messages that end up in Logstash's dead letter get sent to a special dlx queue in RabbitMQ with two further fields being a retry counter and the reason for dead letter. We then move the messages to a "heal" queue to process and re-attempt to send to Elastic.

My questions are:

  1. If the message ends up in dead letter again after trying to fix it, will it be as a new document? Or will logstash consider it as an existing document in its dead letter and just leave it there?

  2. Is there a way to forcefully create mapping errors in Logstash to cycle the same messages through logstash so I can test out the retry counter and see the dead letter reason field update on each try?

  3. I noticed warnings in the logs event previously submitted to dead letter queue. Skipping... which I'm not sure why I'm getting as I have sincedb setup in the config?

Thanks in advance!

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.