Configuring Logstash to Use Dead Letter Queues


#1

Per Configuring Logstash to Use Dead Letter Queues:

Dead letter queues are disabled by default. To enable dead letter queues, set the dead_letter_queue_enable option in the logstash.yml settings file:

dead_letter_queue.enable: true

I enabled option via Configuring Logstash for Docker | Logstash Reference [6.2] | Elastic:

# grep DEAD_LETTER_QUEUE.ENABLE docker-compose.override.yml 
                        - DEAD_LETTER_QUEUE.ENABLE=true
#

Processing Events in the Dead Letter Queue

# cat pipeline/10-input-dead_letter_queue.conf 
input {
	dead_letter_queue {
		path => "/usr/share/logstash/data/dead_letter_queue/main"
	}
}
# 

and now, whenever I start logstash, I'm getting following errors:

logstash11  | [2018-02-21T16:04:40,494][ERROR][logstash.pipeline        ] Error registering plugin {:pipeline_id=>"main", :plugin=>"<LogStash::Inputs::DeadLetterQueue path=>\"/usr/share/logstash/data/dead_letter_queue/main\", id=>\"dbb698a5fbc95fc57c4d4035e6aea289b838d574786111ae96ee36dc6438b7fc\", enable_metric=>true, codec=><LogStash::Codecs::Plain id=>\"plain_9c74df7f-06ba-46db-a683-a034892e1b8c\", enable_metric=>true, charset=>\"UTF-8\">, pipeline_id=>\"main\", commit_offsets=>true>", :error=>"/usr/share/logstash/data/dead_letter_queue/main/main", :thread=>"#<Thread:0x749b0f73 run>"}
logstash11  | [2018-02-21T16:04:40,886][ERROR][logstash.pipeline        ] Pipeline aborted due to error {:pipeline_id=>"main", :exception=>java.nio.file.NoSuchFileException: /usr/share/logstash/data/dead_letter_queue/main/main, :backtrace=>["sun.nio.fs.UnixException.translateToIOException(sun/nio/fs/UnixException.java:86)", "sun.nio.fs.UnixException.asIOException(sun/nio/fs/UnixException.java:111)", "sun.nio.fs.LinuxWatchService$Poller.implRegister(sun/nio/fs/LinuxWatchService.java:246)", "sun.nio.fs.AbstractPoller.processRequests(sun/nio/fs/AbstractPoller.java:260)", "sun.nio.fs.LinuxWatchService$Poller.run(sun/nio/fs/LinuxWatchService.java:364)", "java.lang.Thread.run(java/lang/Thread.java:748)"], :thread=>"#<Thread:0x749b0f73 run>"}
logstash11  | [2018-02-21T16:04:40,909][ERROR][logstash.agent           ] Failed to execute action {:id=>:main, :action_type=>LogStash::ConvergeResult::FailedAction, :message=>"Could not execute action: LogStash::PipelineAction::Create/pipeline_id:main, action_result: false", :backtrace=>nil}

second part of my issue is: I'd like for events from DLQ to be display via:

output {
  stdout {
    codec => rubydebug { metadata => true }
  }
}

however, I already have output as part of pipeline, how would I display only events from DLQ?

Please advise.


#2

You have told logstash that the DLQs are under /usr/share/logstash/data/dead_letter_queue/main, and you have not given it a pipeline id, so it is going to default to main for that, and look for /usr/share/logstash/data/dead_letter_queue/main/main. The error is telling you that directory does not exist.

I do not understand the second part of your question. If the input in your DLQ pipeline is the DLQ then the pipeline will only process events from the DLQ.


#3

ahh, I see main/main, so I tweak it a bit:

# cat pipeline/10-input-dead_letter_queue.conf 
input {
	dead_letter_queue {
		path => "/usr/share/logstash/data/dead_letter_queue"
	}
}
# 

and now logstash's log showing me following:

logstash11 | [2018-02-21T20:25:04,224][WARN ][org.logstash.common.io.DeadLetterQueueWriter] Event previously submitted to dead letter queue. Skipping...


"A picture is worth ten thousand words")

my output:

# grep -v ^# pipeline/30-output-elasticsearch.conf 
output {
	if [tags] {
		elasticsearch {
			hosts => "elasticsearch:9200"
			user => "elastic"
			password => "X"
			manage_template => false
			index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
			document_type => "%{[@metadata][type]}"
		}
	} else {
		elasticsearch {
			hosts => "elasticsearch:9200"
			user => "elastic"
			password => "X"
		}
	}
}
# 

for events from DLQ, I'd like to use:

output { codec => rubydebug }

how can I accomplish that? assign id for DLQ and use if statement in output to redirect to alternative output?

Please advise.


#4

That sounds entirely reasonable.


#5

input:

# cat pipeline/10-input-dead_letter_queue.conf 
input {
	dead_letter_queue {
		id => "dlq"
		path => "/usr/share/logstash/data/dead_letter_queue"
	}
}
# 

output:

# cat pipeline/30-output-elasticsearch.conf | tail -6
	} else if ["dlq"] == [id] {
		stdout {
			codec => rubydebug { metadata => true }
		}
	}
}
# 

output is wrong and breaks logstash(

Do you think you can give me a hand here, please (with cherry on the top)?)


#6

Instead of setting id (since I do not know whether you can reference that the way you want to), try adding a tag

tags => [ "dlq" ]

and then checking for that

} else if "dlq" in [tags] {

#7

hmm, I tried and even though logstash still produces following messages:

logstash11 | [2018-02-21T21:52:43,054][WARN ][org.logstash.common.io.DeadLetterQueueWriter] Event previously submitted to dead letter queue. Skipping...

I don't see anything in stdout (codec=>rubydebug)...

Any ideas?


#8

logstash knows it has processed the item that is in the DLQ, so it is not going to reprocess it unless you takes steps to make that happen. There is a sincedb_path parameter to the DLQ input that tells it which file to use to keep track of which queue items have been processed. You appear to be using the default. You could delete that file, which should cause logstash to start at the beginning of the queue again (and recreate the sincedb).


#9

I'm using Docker container for Logstash and I don't preserve sincedb while restarting container. Regardless, I'm not interested in old events, all I care about now and I can't get current events to stdout( above message is for new event...


(system) #10

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.