DLQ pipeline not showing fields added in the filter section of the Logstash pipeline in the datastream

Elastic stack 7.13 version. Winlogbeat messages are ingested into ElasticSearch cluster and the dead letter queue is filling up and causing "/var/log/messages" to fill up as well with messages containing "cannot write event to DLQ(path: /data/dead_letter_queue/beats_logs): reached maxQueueSize of 1073741824"
/var/log size fills up quickly because of these messages and we have purge /var/log/message file. I am trying to find root cause of the issue and fix it by configuring Logstash pipeline for DLQ(dead_letter_queue). The issues I am running into is specific to the fields added in the filter section of the pipeline are are added to the index of the datastream.

Logstash pipeline details below:

input {
dead_letter_queue{
id => deadletter
path => "/data/dead_letter_queue"
commit_offsets => true
pipeline_id => "beats_logs"
}
}

filter {
mutate {
add_field => {
"error_reason" => "%{[@metadata][dead_letter_queue][reason]}"
"plugin_id" => "%{[@metadata][dead_letter_queue][plugin_id]}"
"plugin_type" => "%{[@metadata][dead_letter_queue][plugin_type]}"
"entry_time" => "%{[@metadata][dead_letter_queue][entry_time]}"
}
}
}
output {
elasticsearch {
hosts => ["https://ESServer1:9200", "https://ESServer2:9200", "https://ESServer3:9200", "https://ESServer4:9200"]
data_stream => "true"
data_stream_type => "logs"
data_stream_dataset => "deadletterqueue"
user => "elastic"
password => "${es_pwd}"
cacert => "/etc/logstash/certs/cacert.pem"
}
}

The index (logs-deadletterqueue-default) generated does not contain the fields and only some messages show up in the index. I am expected that it is not able to fetch all the messages in DLQ or running into some issue with way the pipeline is created. There bunch of unprocessed logs in the DLQ path listed below

[root@ESServer1 beats_logs]# ls
100.log 105.log 10.log 13.log 18.log 22.log 27.log 31.log 36.log 40.log 45.log 4.log 54.log 59.log 63.log 68.log 72.log 77.log 81.log 86.log 90.log 95.log 9.log
101.log 106.log 110.log 14.log 19.log 23.log 28.log 32.log 37.log 41.log 46.log 50.log 55.log 5.log 64.log 69.log 73.log 78.log 82.log 87.log 91.log 96.log
102.log 107.log 111.log.tmp 15.log 1.log 24.log 29.log 33.log 38.log 42.log 47.log 51.log 56.log 60.log 65.log 6.log 74.log 79.log 83.log 88.log 92.log 97.log
103.log 108.log 11.log 16.log 20.log 25.log 2.log 34.log 39.log 43.log 48.log 52.log 57.log 61.log 66.log 70.log 75.log 7.log 84.log 89.log 93.log 98.log
104.log 109.log 12.log 17.log 21.log 26.log 30.log 35.log 3.log 44.log 49.log 53.log 58.log 62.log 67.log 71.log 76.log 80.log 85.log 8.log 94.log 99.log

generated Datastream has 4565 messages but non of them have fields like "error_reason" and other fields added in the filter section.

The DLQ contains messages where elasticsearch returned an error when the elasticsearch output tried to index them. If you try to index them again without fixing whatever caused the error then I would expect them the same error to occur again.

I would like to find the reason why a message ended up in the dead letter queue in the first place. But the fields added in the filter section are datastream.

I could manually go get error details for example from "100.log" contains "reason"=>"failed to parse field [process] of type [text] in document with id '8vC27XoBeH0M-gzpJxoq'. Preview of field's value: '{name=java.exe, executable=E:\Tableau\Tableau Server\packages\repository.20203.21.0216.1504\jre\bin\java.exe}'", "caused_by"=>{"type"=>"illegal_state_exception", "reason"=>"Can't get text on a START_OBJECT at 1:178"}}}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.