Logstash and Elasticion version: 5.6.9
Setting up multiple pipelines so that any messages that end up in Logstash's Dead Letter get sent back to RabbitMQ so that we can re-process them after sorting out any potential mapping issues (Further to Logstash Dead Letter Messages back to RabbitMQ)
Defined the following pipelines in Logstash:
########### Actual pipeline sending to Elastic ###########
input {
rabbitmq {
host => "rabbitmq"
port => 5672
user => "test"
password => "test"
vhost => "test"
queue => "test.queue"
codec => "json"
metadata_enabled => true
passive => true
ack => true
prefetch_count => 1000
threads => 5
tags => "audit"
heartbeat => "5"
}
}
filter {
if "audit" in [tags] {
mutate {
rename => { "messageTemplate" => "message" }
remove_field => [ "@version" ]
remove_field => [ "[fields][PerformanceType]" ]
}
if ![target_index] {
mutate {
add_field => { "target_index" => "%{[@metadata][rabbitmq_headers][x-target-index]}" }
}
}
}
}
output {
if "audit" in [tags] {
elasticsearch {
hosts => ["elastic01","elastic02","elastic03"]
index => "%{target_index}"
template => "/etc/logstash/templates/template1"
template_name => "audit"
}
}
}
####### Dead Letter Pipeline to send to RabbitMQ #########
input {
dead_letter_queue {
path => "/opt/logstash/dead_letter_queue"
commit_offsets => true
sincedb_path => "/opt/logstash/dead_letter_queue/sincedb"
tags => "dlx"
}
}
filter {
if "dlx" in [tags] {
if ![rabbitmq.dlx.retry.count] {
mutate { add_field => {"rabbitmq.dlx.retry.count" => "0" } }
mutate { convert => { "rabbitmq.dlx.retry.count" => "integer" } }
} else {
ruby { code => 'event.set("rabbitmq.dlx.retry.count", event.get("rabbitmq.dlx.retry.count").to_i + 1)' }
}
if ![rabbitmq.dlx.reason] {
mutate { add_field => { "rabbitmq.dlx.reason" => "%{[@metadata][dead_letter_queue][reason]}" } }
} else {
mutate { update => { "rabbitmq.dlx.reason" => "%{[@metadata][dead_letter_queue][reason]}" } }
}
}
}
output {
if "dlx" in [tags] {
rabbitmq {
host => "rabbitmq"
port => 5672
user => "test"
password => "test"
vhost => "test"
exchange => "test.dlx.exchange"
exchange_type => "fanout"
codec => "json"
heartbeat => "5"
}
}
}
####### Healing Pipeline to sort out any mapping errors. No Output defined here as the removal of the dlx tag implies that the message goes through Output with audit in tags #########
input {
rabbitmq {
host => "rabbitmq"
port => 5672
user => "test"
password => "test"
vhost => "test"
queue => "test.heal.queue"
codec => "json"
metadata_enabled => true
passive => true
ack => true
prefetch_count => 1000
threads => 5
tags => "audit_heal"
heartbeat => "5"
}
}
filter {
if "audit_heal" in [tags] {
mutate {
rename => { "messageTemplate" => "message" }
remove_field => [ "@version" ]
remove_field => [ "[fields][PerformanceType]" ]
remove_tag => [ "dlx" ]
}
}
}
In a nutshell, any messages that end up in Logstash's dead letter get sent to a special dlx queue in RabbitMQ with two further fields being a retry counter and the reason for dead letter. We then move the messages to a "heal" queue to process and re-attempt to send to Elastic.
My questions are:
-
If the message ends up in dead letter again after trying to fix it, will it be as a new document? Or will logstash consider it as an existing document in its dead letter and just leave it there?
-
Is there a way to forcefully create mapping errors in Logstash to cycle the same messages through logstash so I can test out the retry counter and see the dead letter reason field update on each try?
-
I noticed warnings in the logs
event previously submitted to dead letter queue. Skipping...
which I'm not sure why I'm getting as I have sincedb setup in the config?
Thanks in advance!