I store the last run metadata and use a calculated value using the DATEDIFF function. Using this I'm able to keep track of progress. The downside is however, that whenever something later in the pipeline fails such as a filter this value is still stored.
What you are doing so far is correct. I think what you need is to enable dead-letter-queues:
The dead letter queue (DLQ) is designed as a place to temporarily write events that cannot be processed. The DLQ gives you flexibility to investigate problematic events without blocking the pipeline or losing the events.
So in your example, whenever a filter fails the docs would be stored in the DLQ. Then, you can fix the pipeline and use the dead_letter_queue input to read the failed events and process them.
Thanks for the response ! That sounds promising! I did read this on the documentation page
The dead letter queue is currently supported only for the Elasticsearch output and conditional statements evaluation. The dead letter queue is used for documents with response codes of 400 or 404, both of which indicate an event that cannot be retried. It’s also used when a conditional evaluation encounter an error.
I'm not entirely sure if this means it only fires within a failure in the output plugin, or that if a filter fails it also puts it into this queue? For clarity, I'm using a HTTP filter plugin to send the data to an API, which then returns the same data but enriched, I want to ensure that if something goes wrong with this setup, the data is not lost.
Are you aware if this queue would solve this issue?
Would this secondary pipeline have to be entirely seperate from the current pipeline, or could I just add the queue as secondary input? What would you reccomend?
Hmmm, I am not sure but it sounds as if this wouldn't fit your usecase then.
Depending on your usecase, you could try a few things:
ingest your other data to elastic and use an enrich processor with failure store enabled
use a ruby filter to get the data because in ruby ypu can use begin-rescue structures
At least the failure store in Elasticsearch definitely catches ingest pipeline failures (tried it out myself already):
A failure store is a secondary set of indices inside a data stream, dedicated to storing failed documents. A failed document is any document that, without the failure store enabled, would cause an ingest pipeline exception or that has a structure that conflicts with a data stream's mappings.
For the ingesting using the failure store, from my limited understanding from reading the documentation this seems to be only for datastreams/time-series data. My data consists of various kinds of documents such as support tickets and change requests. So I'm guessing the ruby filter would be the preferred option.
I'll give that a try in the near future and if it's a suitable solution. I'll let you know!
The DLQ only makes sense when you have events that are not indexed because Logstash received a 400 or 404 from Elasticsearch, it has nothing to do with filter errors.
I don't think it will help you in this case.
What could go wrong and when it go wrong can you identify in the returned event? You will probably need to redirect the events that went wrong to another destination, be it a different index or even a file, so you can analyze and reprocess them.
I'm not sure what is the issue here, the last run will be populate before any filter as it comes from the input, is your pipeline breaking and you need to reprocess the data?
I have a HTTP filter which points to an API, during development my machine can sometimes have trouble keeping up with the requests and the request to the API gets lost. (Not entirely sure what the reason is) When this happends the inputs tracking column is still updated so it will not refetch the same data again.
So I'm looking for a way to handle this issue.
So would for example adding a tag to the event such as "http_filter_failed" be a sufficient way of handling this? Where I have a conditional in the output that outputs failed events to a seperate index?
I'll add the filter config below for more clarification:
filter {
http {
connect_timeout => 30
request_timeout => 60
url => "${API_HOST}"
verb => "POST"
body_format => "json"
body => {
Id => "%{id}"
Title => "%{title}"
Description => "%{description}"
Created => "%{created}"
Modified => "%{modified}"
}
target_body => "embedding_response"
}
# This was the temp fix (This would have to be removed of course
if ![embedding_response] {
drop { }
}
if [embedding_response] {
mutate {
replace => {
"title" => "%{[embedding_response][Title]}"
"description" => "%{[embedding_response][Description]}"
}
copy => {
"[embedding_response][titleEmbedding]" => "title_embedding"
"[embedding_response][descriptionEmbedding]" => "description_embedding"
}
remove_field => [ "embedding_response" ]
}
}
}
If you can identify that it fails, then adding a tag would be enough.
Then you can change your output to have a conditional based on that tag.
Something like:
output {
if "tag_failed" in [tags] {
elasticsearch { destination for events with the failed tag }
} else {
elasticsearch { destination for events that worked }
}
}
Apache, Apache Lucene, Apache Hadoop, Hadoop, HDFS and the yellow elephant
logo are trademarks of the
Apache Software Foundation
in the United States and/or other countries.