Hello Guys,
I have a pipeline in my ELK that get data from Kafka and send Elasticsearch. But I don't know how I can set the dead letter queue. My goal is every errors be send a topic called dlq+date.
I tried several solutions, but not work for me.
Someone can help me?
Logstash version 7.7.1
This is my logstash.conf (without dlq)
input{
kafka{
id => elkkfaplp1
group_id => "kafka1"
topics_pattern => ".*"
bootstrap_servers => "IP:PORT"
sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='${KK_USR}' password='${KK_PWD}';"
security_protocol => SASL_PLAINTEXT
sasl_mechanism => PLAIN
decorate_events => true
codec => "json"
auto_offset_reset => "earliest"
metadata_max_age_ms => 500
}
kafka{
id => elkkfaplp2
group_id => "kafka2"
topics_pattern => ".*"
bootstrap_servers => "IP2:PORT"
sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='${KK_USR}' password='${KK_PWD}';"
security_protocol => SASL_PLAINTEXT
sasl_mechanism => PLAIN
decorate_events => true
codec => "json"
auto_offset_reset => "earliest"
metadata_max_age_ms => 500
}
}
filter{
mutate {
#lowercase => [ "[@metadata][kafka][topic]" ]
strip => [wildcard]
}
if [wildcard] != "" {
mutate {
add_field => { "new_field" => "%{wildcard}" }
remove_field => [wildcard]
}
json {
source => [new_field]
}
mutate {
remove_field => [new_field]
}
} else {
mutate {
remove_field => [wildcard]
}
}
}
output {
if [deadletter] != "" {
elasticsearch{
id => energisa_cluster1
hosts => ["https://elkesaplp1.comp.com.br", "https://elkesaplp2.comp.com.br", "https://elkesaplp3.comp.com.br"]
user => "${ES_USR}"
password => "${ES_PWD}"
cacert => "/etc/logstash/comp-ca.pem"
ssl => true
ssl_certificate_verification => false
index => "comp-%{[@metadata][kafka][topic]}-%{+YYYY.MM.dd}"
}
}
}