Dead Letter Queue - Issue

Environment:
Logstash 6.2.4 in docker, multi-pipeline mode
Input-Redis to Output-Logstash

I have enabled dead letter queue for my pipelines and do see mapping failure data going against appropriate pipelines in dead-letter queue folder. I'm trying to create a pipeline to read dead-letter folder and push into another elastic indice that does not have any mapping enforced. Though I do see the data going into indice, I'm not able to get @metadata which provides the actual error. I get the original message etc. I tried using rubydebug code against output(elastic), but not getting expected result. Is it possible? Please advice.

Goal: Read DLQ data, and push to Elastic indice that has no mapping defined with @metadata info that captures the error on original pipeline.

To replicate

Elastic Mapping
PUT _template/test.dlq
{
"index_patterns": ["test*"],
"mappings": {
"doc": {
"properties": {
"message": {
"type": "integer"
}
}
}
}
}

Redis entry LPUSH "test.1" "invalid"

Logstash (To process dlq as the data that go in is invalid inline with mapping)
input {
dead_letter_queue {
path => "/var/log/logstash/deadletter"
commit_offsets => true
pipeline_id => "test.1"
?? codec => rubydebug { metadata => true }
}
}
output {
elasticsearch {
hosts => ["ES:9200"]
manage_template => false
index => "test.dlq.data"
?? codec => rubydebug { metadata => true }
}
}
??-- tried both

Another Issue: Though I enabled commit_offsets, everytime I restart docker instance, it processes all the DLQ entries historically. Should I explicitly mount sharedb path?

Thanks for advice.

Tried adding below filter to dead letter processing pipeline. But, the target field resulted with value null.

mutate {
rename => {
"@metadata" => "failurereason"
}
NOTE: used codec => rubydebug { metadata => true } against dead_letter_queue input plugin.

codec => rubydebug { metadata => true } doesn't make any sense for an input plugin and you should never change the codec of the elasticsearch output.

Tried adding below filter to dead letter processing pipeline. But, the target field resulted with value null.

mutate {
rename => {
"@metadata" => "failurereason"
}

I'd expect that to work. Have you tried using the mutate filter's copy option instead? Or renaming individual subfields of @metadata?

Another Issue: Though I enabled commit_offsets, everytime I restart docker instance, it processes all the DLQ entries historically. Should I explicitly mount sharedb path?

Yes, or put the data directory in a persistent volume.

Thanks Magnus.

Yes. I tried using copy/mutate as well. But no result. Below is the version I tried after removing codec.

input {
dead_letter_queue {
path => "/var/log/logstash/deadletter"
commit_offsets => true
pipeline_id => "test.1"
}
dead_letter_queue {
path => "/var/log/logstash/deadletter"
commit_offsets => true
pipeline_id => "test.2"
}

}
filter {
mutate {
copy => { "@metadata" => "@metadata" }
}
}
output {
elasticsearch {
hosts => ["-----------------.com:9200"]
manage_template => false
index => "dlq.log"
user => "logstash_user"
password => "---------------------"
ssl => true
ssl_certificate_verification => true
cacert => "/usr/share/logstash/config/ssl/-----.crt"
}
}

Below is what is seen ES.

  {
    "_index": "dlq.log",
    "_type": "doc",
    "_id": "vNTpzGMBbtAbmzXMdAO4",
    "_score": 1,
    "_source": {
      "type": "test.2",
      "@timestamp": "2018-06-04T22:27:29.427Z",
      "message": "wrong.5",
      },
       "@version": "1"
    }
  }

"wrong.5" is the data that is incorrectly passed to the attribute, which was expecting a integer value in ES. The value to type comes from the actual pipeline that tries to process the data.

Also, w.r.t "sincedb" setting, the folder I do see the folder created for input plugins against data, but no files are created. So whenever I stop and restart docker instance, it processes all the entries.

docker run -d --name testing -v /etc/logstash/config:/usr/share/logstash/config -v /etc/logstash/config/ssl:/usr/share/logstash/config/ssl -v /etc/logstash/pipeline:/usr/share/logstash/pipeline -v /var/log/logstash:/var/log/logstash -v /etc/logstash/data:/usr/share/logstash/data docker.elastic.co:443/logstash/logstash:6.2.4

[root@@---- dead_letter_queue]# docker start testing
testing

[root@---- dead_letter_queue]# docker stop testing
testing
[root@@---- dead_letter_queue]# ls -l
total 0
drwxr-xr-x 2 ---- ---- 6 Jun 4 23:16 test.1
drwxr-xr-x 2 ---- ---- 6 Jun 4 23:16 test.2
[root@@---- dead_letter_queue]# ls -l test*
test.1:
total 0

test.2:
total 0

mutate {
copy => { "@metadata" => "@metadata" }
}

What's this supposed to accomplish? You're copying a field onto itself.

OOps Wrong Un' was the corrected version.
mutate {
rename => {
"@metadata" => "failurereason"
}

But the result is as in my original post. Not able to get the error to elastic. Quickly let me retest and add on copy. Sorry for the confusion.

Tried with proper copy. Used below filter

filter {
mutate {
copy => { "@metadata" => "failurereason" }
}
}

Whenever I use this, I get below error in logstash logs

[ERROR] 2018-06-07 20:48:40.782 [Ruby-0-Thread-139@[main]>worker0: /usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:385] elasticsearch - An unknown error occurred sending a bulk request to Elasticsearch. We will retry indefinitely {:error_message=>"", :error_class=>"LogStash::Json::GeneratorError", :backtrace=>["/usr/share/logstash/logstash-core/lib/logstash/json.rb:28:in `jruby_dump'", "/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-output-elasticsearch-9.1.1-java/lib/logstash/outputs/elasticsearch/http_client.rb:118:in `block in bulk'", "org/jruby/RubyArray.java:2486:in `map'", "/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-output-elasticsearch-9.1.1-java/lib/logstash/outputs/elasticsearch/http_client.rb:118:in `block in bulk'", "org/jruby/RubyArray.java:1734:in `each'", "/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-output-elasticsearch-9.1.1-java/lib/logstash/outputs/elasticsearch/http_client.rb:116:in `bulk'", "/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-output-elasticsearch-9.1.1-java/lib/logstash/outputs/elasticsearch/common.rb:243:in `safe_bulk'", "/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-output-elasticsearch-9.1.1-java/lib/logstash/outputs/elasticsearch/common.rb:157:in `submit'", "/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-output-elasticsearch-9.1.1-java/lib/logstash/outputs/elasticsearch/common.rb:125:in `retrying_submit'", "/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-output-elasticsearch-9.1.1-java/lib/logstash/outputs/elasticsearch/common.rb:36:in `multi_receive'", "/usr/share/logstash/logstash-core/lib/logstash/output_delegator_strategies/shared.rb:13:in `multi_receive'", "/usr/share/logstash/logstash-core/lib/logstash/output_delegator.rb:49:in `multi_receive'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:477:in `block in output_batch'", "org/jruby/RubyHash.java:1343:in `each'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:476:in `output_batch'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:428:in `worker_loop'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:386:in `block in start_workers'"]}

and it is continous.

My suspision is because it is not able to generate proper JSON output fur elasticsearch insertion as @metadata field in DLQ data is not human readable and required rubydebug. Also, could you please clarify on below

the documentation refers "Codecs are essentially stream filters that can operate as part of an input or output.". So the first take was that to use rubydebug codec plugin to read the metadata.

Unless the @metadata is read/formatted properly for ES indexing, it may not be able to go to Elasticsearch. Rubydebug was the filter/codec plugin for it. But not working with ES output/DLQ input. It works well with stdout.

Thanks.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.