I'm now playing around with the dead letter queue feature of Logstash. First I tried around with commit_offsets => false
to be able to set up everything and as expected the DLQ events were ingested after each restart of logstash.
Now after I'm happy with the extra dead letter queue pipeline which is ingesting a failed event just without much groking and so on, I switched to commit_offsets => true
. But the DLQ events still get ingested after each restart of Logstash. Am I doing something wrong or is this option just not working in Logstash 7.9.1 or do I just understand it wrong how commit_offsets
is working?
The DLQ Pipeline looks similar to:
# the pipeline for DLQ messages
# they just get ingested with some tags and without any pattern-analyzing
input {
dead_letter_queue {
path => "/data/logstash/dead_letter_queue"
# start from last known position
commit_offsets => true
# for debugging always send all messages from dlq
# commit_offsets => false
pipeline_id => "filter-syslog"
tags => [ "syslog" ]
type => "syslog"
}
dead_letter_queue {
path => "/data/logstash/dead_letter_queue"
# start from last known position
commit_offsets => true
# for debugging always send all messages from dlq
# commit_offsets => false
pipeline_id => "filter-filebeat"
tags => [ "filebeat" ]
type => "filebeat"
}
}
filter {
if "syslog" in [tags] {
# we remove all fields except listed ones
prune {
id => "prune_1_1"
whitelist_names => [ "@timestamp", "@metadata", "event" ]
}
mutate {
id => "mutate_1_1"
add_tag => [ "syslog-${LOGSTASH_FILTER_TAG}" ]
add_tag => [ "syslog" ]
}
}
if "filebeat" in [tags] {
# we remove all fields except listed ones
prune {
id => "prune_1_2"
whitelist_names => [ "@timestamp", "@metadata", "event", "agent", "beat" ]
}
mutate {
id => "mutate_1_2"
add_tag => [ "filebeat-${LOGSTASH_FILTER_TAG}" ]
add_tag => [ "filebeat" ]
}
}
# we add the message field with the content of the original event sent
mutate {
id => "mutate_1_x"
add_field => [ "message", "%{[event][original]}" ]
add_field => [ "[dlq][original]", "%{[@metadata][dead_letter_queue]}" ]
add_tag => [ "dlq" ]
}
if "dlq" in [tags] {
json {
id => "json_1"
source => "[dlq][original]"
# seems to overwrite everything - will add original later again
target => "[dlq]"
# to suppress warnings and tagging with "_jsonparsefailure"
# skip_on_invalid_json => true
}
if "_jsonparsefailure" not in [tags] {
# we add the original dlq-message back
# and copy the stuff from dlq.reason into a temp-field
mutate {
id => "mutate_2"
add_field => [ "[dlq][original]", "%{[@metadata][dead_letter_queue]}" ]
add_field => [ "[dlq][reason_temp]", "%{[dlq][reason]}" ]
}
# we remove the initial reason field
mutate {
id => "mutate_3"
remove_field => [ "[dlq][reason]" ]
}
# add the temp-field created before as orginial
mutate {
id => "mutate_4"
add_field => [ "[dlq][reason][original]", "%{[dlq][reason_temp]}" ]
}
# and remove the not needed temp-field again
mutate {
id => "mutate_5"
remove_field => [ "[dlq][reason_temp]" ]
}
# so that we can grok the original to get more readable additional fields - this later can be improved even more
grok {
id => "grok_1"
match => { "[dlq][reason][original]" => "%{GREEDYDATA:[dlq][reason][message]}. status: %{POSINT:[dlq][reason][status]}, action: %{GREEDYDATA:[dlq][reason][action]}" }
}
}
}
}
output {
if ([event][dataset] == "syslog") {
elasticsearch {
id => "elasticsearch_logstash_syslog_ilm_dlq"
manage_template => false
codec => json_lines
cacert => "/etc/logstash/certs/elasticsearch-ca.pem"
user => "logstash_internal"
password => "supersecure"
ssl => true
ilm_enabled => "true"
ilm_rollover_alias => "logstash_syslog_ilm"
ilm_pattern => "{now/d}-000001"
ilm_policy => "logstash-syslog"
# the elasticsearch ouptut plugin will loadbalance :D - IPs obfuscated
hosts => [ "https://123.123.123.123:9200", "https://123.123.123.124:9200" ]
}
}
if [event][dataset] == "filebeat" {
elasticsearch {
id => "elasticsearch_logstash_filebeat_ilm_dlq"
manage_template => false
codec => json_lines
cacert => "/etc/logstash/certs/elasticsearch-ca.pem"
user => "logstash_internal"
password => "supersecure"
ssl => true
ilm_enabled => "true"
ilm_rollover_alias => "logstash_filebeat_ilm"
ilm_pattern => "{now/d}-000001"
ilm_policy => "logstash-filebeat"
# the elasticsearch ouptut plugin will loadbalance :D - IPs obfuscated
hosts => [ "https://123.123.123.123:9200", "https://123.123.123.124:9200" ]
}
}
if "dlq" in [tags] {
file {
id => "file_dlq_debug"
path => "/data/plain-log-archive/dlq-debug.txt"
}
}
}