Continuous data read using Elasticsearch input plugin in logstash

Hi Team,

I have a requirement where i need to read the data from an elasticsearch index(datastream) and perform "split" operation to split the array records into individual documents and store into a destination index(datastream) in elasticsearch.

I have created a logstash pipeline which will run for every hour and read the data from last 1hour. But i have observed that sometimes i am seeing some data missing without any errors in logstash and no data in dead letter queue or in persistent queue. Is there any way that we can track the documents processed and make sure that the no documents got missed while reading/processing the data.

my Pipeline:

input {
       elasticsearch {
        cloud_id => "xxxxxxxxxx"
        index => "orders-data"
        query =>  '{"query": {"range": {"event.ingest": {"gte": "now-1h"}}}}'
        schedule => "0 * * * *"
  }
}
 
 
filter {
    split {
        field => "[orders]"
    }
 
}
 
output {
            elasticsearch {
        cloud_id => "xxxx"
        index => "splitted-orders"
        ssl => true
        action => "create"
    }
}

When i am working on indexes i used to update the source index by adding a field called processed = true when logstash processed the document. But now i am unable to updated the source index as it is a datastream.

input {
       elasticsearch {
        cloud_id => "xxxxxxxxxx"
        index => "orders-data"
        query =>  '{"query": {"range": {"event.ingest": {"gte": "now-1h"}}}}'
        schedule => "0 * * * *"
		docinfo => true
        docinfo_target => "[@metadata][doc]"
  }
}
 
 
filter {
	mutate{
		add_field => {"[processed]" => true}
	}
    split {
        field => "[orders]"
    }
 
}
 
output {
    elasticsearch {
        cloud_id => "xxxx"
        index => "orders-data"
        ssl => true
        action => "update"
		document_id => "%{[@metadata][doc][_id]}"
    }
    elasticsearch {
        cloud_id => "xxxx"
        index => "splitted-orders"
        ssl => true
        action => "create"
    }
}

How did you identify the missing documents? Have you checked their event.ingest time to see if there is some pattern?

Your issue may be related to the fact that your query will run at the minute 0 of every hour, but you are looking for now-1h, this may lead to gaps because there is no guarantee that the query will run at the exact time, it may have some delay, and also you may have some documents added in the end of the last hour that were still not searchable.

Try to increase the now-1h to something like now-70m or now-80m.

Your look back time needs to be larger than your schedule.

Hi @leandrojmp ,

If i increase the now-1h to now-70m i may i get the duplicate records right?

Regards,
Praveen Kumar