Logstash Dead Letter Messages back to RabbitMQ


(David Bugeja) #1

Logstash and Elastic versions: 5.6.9

We're currently taking messages from RabbitMQ using logstash and outputting them to Elasticsearch. We noticed some scenarios of 400 responses and have enabled Logstash dead letter to ensure messages are not lost.

We have a further input on the dead letter and output these messages back to RabbitMQ into a Dead Letter Exchange there. This makes it easy for us to replay the messages back should we figure out why we had the 400 and would have fixed it.

What we noticed is that the headers in the rabbitmq message originally sent were being dropped when they get to the output queue of rabbitmq after it passes through the dead letter system.

Is this something we are missing or is this expected behaviour?


Replaying Dead Letter messages via RabbitMQ
(Magnus Bäck) #2

That's expected. Headers are optionally added as fields to the event by the rabbitmq input but if you want to preserve them when they're sent to the dead letter queue that's up to you, but if you don't want the headers indexed in ES I doubt there's a way for you to get exactly what you want.


(David Bugeja) #3

Hi Magnus,

Thanks for your reply! My question would be, how do you preserve them into dead letter? And if they are preserved in dead letter, can they be sent back to rabbit via the rabbit output plugin?


(Magnus Bäck) #4

My question would be, how do you preserve them into dead letter?

Flip the rabbitmq input's metadata_enabled option and move [@metadata][rabbitmq_headers] to another place outside @metadata with a mutate filter.

And if they are preserved in dead letter, can they be sent back to rabbit via the rabbit output plugin?

Judging by https://github.com/logstash-plugins/logstash-output-rabbitmq/issues/6 that's currently not possible.


(David Bugeja) #5

Thanks. We're trying the following to preserve the index name as a separate field so that if it does end up in dead letter, we can replay the message back into the original rabbitmq queue:

filter {
  mutate {
   ...
    add_field => { "[@metadata][rabbitmq_headers][x-target-index]" => "target_index" }
  ...
  }
}

Then in the output:

output {
elasticsearch {
hosts => ["..."]
index => "%{[fields][target_index]}"
...
}
}

Seems the field doesn't end up in the doc and the index name would be a literal "%{[fields][target_index]}".

Anything I'm missing?
Thanks in advance!


(Magnus Bäck) #6
add_field => { "[@metadata][rabbitmq_headers][x-target-index]" => "target_index" }

This is backwards. Change to:

add_field => { "target_index" => "[@metadata][rabbitmq_headers][x-target-index]" }
index => "%{[fields][target_index]}"

The name of the field is target_index, so either drop [fields] or update the add_field stanza to actually save the x-target-index header in the [fields][target_index] field.


(David Bugeja) #7

So I changed that config to the following in my filter mutate:

    add_field => { "target_index" => "[@metadata][rabbitmq_headers][x-target-index]" }

And in output:
index => "%{target_index}"

Now I get an index with the name: "[@metadata][rabbitmq_headers][x-target-index]"


(Magnus Bäck) #8

Did you enable metadata collection in the rabbitmq input?

You might want to include a stdout { codec => rubydebug => { metadata => true } } output to dump the raw contents of an event, including @metadata.


(David Bugeja) #9

Yes it has always been there. Here's the full config just to clarify:

input {
  rabbitmq {
    host => "rabbitmq"
    port => 5672
    user => "..."
    password => "..."
    vhost => "..."
    queue => "queue"
    codec => "json"
    metadata_enabled => true
    passive => true
    ack => true
    prefetch_count => 1000
    threads => 5
    tags => "audit"
    heartbeat => "5"
  }
}

filter {
  if "audit" in [tags] {
  mutate {
    rename => { "messageTemplate" => "message" }
    remove_field => [ "@version" ]
    remove_field => [ "[fields][PerformanceType]" ]
    add_field => { "target_index" => "[@metadata][rabbitmq_headers][x-target-index]" }
    }
  }
}

output {
  if "audit" in [tags] {
  elasticsearch {
    hosts => ["elastic01","elastic02","elastic03"]
    index => "%{target_index}"
    template => "/etc/logstash/templates/audit.template"
    template_name => "audit"
  }
  }
}

(David Bugeja) #10

To clarify, using [@metadata][rabbitmq_headers][x-target-index] directly for the index name in the output works, but it defeats the purpose of wanting to store the index name as a field in the first place.

Does the dead letter for Logstash preserve which index name the document should have gone to when it gave the 400/404 error so long as the error is not related to the index name itself?

If that's the case, perhaps we can work around all of this using a separate pipeline, with its own rabbitmq input on a different queue where messages can be moved to manually after any fixes are done. The pipeline would take the index name directly from those messages that are sent back to rabbitmq from the dead letter in the first place perhaps?

Thanks in advance.


(Magnus Bäck) #11
add_field => { "target_index" => "[@metadata][rabbitmq_headers][x-target-index]" }

Ooops, sorry. You're missing %{...} around the field name.

 add_field => { "target_index" => "%{[@metadata][rabbitmq_headers][x-target-index]}" }

Does the dead letter for Logstash preserve which index name the document should have gone to when it gave the 400/404 error so long as the error is not related to the index name itself?

I don't know, but probably not.


(David Bugeja) #12

All good now :slight_smile: thanks a lot!


(system) #13

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.