Hi Team,
I have a requirement where i need to read the data from an elasticsearch index(datastream) and perform "split" operation to split the array records into individual documents and store into a destination index(datastream) in elasticsearch.
I have created a logstash pipeline which will run for every hour and read the data from last 1hour. But i have observed that sometimes i am seeing some data missing without any errors in logstash and no data in dead letter queue or in persistent queue. Is there any way that we can track the documents processed and make sure that the no documents got missed while reading/processing the data.
my Pipeline:
input {
elasticsearch {
cloud_id => "xxxxxxxxxx"
index => "orders-data"
query => '{"query": {"range": {"event.ingest": {"gte": "now-1h"}}}}'
schedule => "0 * * * *"
}
}
filter {
split {
field => "[orders]"
}
}
output {
elasticsearch {
cloud_id => "xxxx"
index => "splitted-orders"
ssl => true
action => "create"
}
}
When i am working on indexes i used to update the source index by adding a field called processed = true when logstash processed the document. But now i am unable to updated the source index as it is a datastream.
input {
elasticsearch {
cloud_id => "xxxxxxxxxx"
index => "orders-data"
query => '{"query": {"range": {"event.ingest": {"gte": "now-1h"}}}}'
schedule => "0 * * * *"
docinfo => true
docinfo_target => "[@metadata][doc]"
}
}
filter {
mutate{
add_field => {"[processed]" => true}
}
split {
field => "[orders]"
}
}
output {
elasticsearch {
cloud_id => "xxxx"
index => "orders-data"
ssl => true
action => "update"
document_id => "%{[@metadata][doc][_id]}"
}
elasticsearch {
cloud_id => "xxxx"
index => "splitted-orders"
ssl => true
action => "create"
}
}