Full pipeline Logstash with dead letter queue

Hello Guys,

I have a pipeline in my ELK that get data from Kafka and send Elasticsearch. But I don't know how I can set the dead letter queue. My goal is every errors be send a topic called dlq+date.

I tried several solutions, but not work for me.

Someone can help me?

Logstash version 7.7.1

This is my logstash.conf (without dlq)
    input{
            kafka{
                    id => elkkfaplp1
                    group_id => "kafka1"
                    topics_pattern => ".*"
                    bootstrap_servers => "IP:PORT"
                    sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='${KK_USR}' password='${KK_PWD}';"
                    security_protocol => SASL_PLAINTEXT
                    sasl_mechanism => PLAIN
                    decorate_events => true
                    codec => "json"
                    auto_offset_reset => "earliest"
                    metadata_max_age_ms => 500
            }
            kafka{
                    id => elkkfaplp2
                    group_id => "kafka2"
                    topics_pattern => ".*"
                    bootstrap_servers => "IP2:PORT"
                    sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='${KK_USR}' password='${KK_PWD}';"
                    security_protocol => SASL_PLAINTEXT
                    sasl_mechanism => PLAIN
                    decorate_events => true
                    codec => "json"
                    auto_offset_reset => "earliest"
                    metadata_max_age_ms => 500
            }
    }
    filter{
            mutate {
                    #lowercase => [ "[@metadata][kafka][topic]" ]
                    strip => [wildcard]
            }
            if [wildcard] != "" {
                    mutate {
                            add_field => { "new_field" => "%{wildcard}" }
                            remove_field => [wildcard]
                    }

                    json {
                            source => [new_field]
                    }

                    mutate {
                            remove_field => [new_field]
                    }
            } else {
                    mutate {
                            remove_field => [wildcard]
                    }
            }
    }
    output {

            if [deadletter] != "" {
                    elasticsearch{
                            id => energisa_cluster1
                            hosts => ["https://elkesaplp1.comp.com.br", "https://elkesaplp2.comp.com.br", "https://elkesaplp3.comp.com.br"]
                            user => "${ES_USR}"
                            password => "${ES_PWD}"
                            cacert => "/etc/logstash/comp-ca.pem"
                            ssl => true
                            ssl_certificate_verification => false

                            index => "comp-%{[@metadata][kafka][topic]}-%{+YYYY.MM.dd}"
                    }
            }
    }

2 Likes

You do not need to make any changes to your logstash.conf to enable dead letter queues. They are enabled by making a change in logstash.yml. If logstash gets a 400 or 404 error when it tries to index the event then it will write the event to the dead letter queue, which is one or more files.

You would then run another logstash instance with a dead_letter_queue input plugin configured, which would read those files and do whatever you want with the events. It sounds like you want to write them to a kafka topic. You could certainly do that using a kafka output.

Thank's @Texugo.

Actually, I'm not had undestand Dead Letter Queue yeat. My problem was resolved.
For this my configuration is similiar with below:

pipelines.yml

    - pipeline.id: main
      path.config: "/etc/logstash/conf.d/logstash.conf"

    - pipeline.id: deadletter
      path.config: "/etc/logstash/conf.d/logstash_dlq.conf"

logstash.yml

    dead_letter_queue.enable: true

logstash_dlq.conf(created by me)
input{
dead_letter_queue{
id => deadletter
path => "/var/lib/logstash/dead_letter_queue"
commit_offsets => true
pipeline_id => "main"
}
}
filter {
mutate {
add_field => {
"error_reason" => "%{[@metadata][dead_letter_queue][reason]}"
"plugin_id" => "%{[@metadata][dead_letter_queue][plugin_id]}"
"plugin_type" => "%{[@metadata][dead_letter_queue][plugin_type]}"
"entry_time" => "%{[@metadata][dead_letter_queue][entry_time]}"
}
}
}
output {
elasticsearch{
#Elasticsearch Config
}
}

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.