We need to extract all documents of an index from Elasticsearch to a file. So our pipeline consists of an Elasticsearch input and a file output. At the time of querying Elasticsearch we know that there will be no further changes to the index - it is static. Extraction can be started by both, the scheduler as well as a single run of the logstash container. After extratcion, the file will be processed by another program that is not related to the Elastic-Stack and contained in another docker-container.
Now, processing of the file by the other program must not begin until we know for sure that the file contains the complete index. The index is pretty large, the size of an output file is around 50 TB. We wish to guard against any "dirty reads", e.g. don't read while the extraction is still going on or in case logstash crashed and the file is incomplete.
Is there a way to build a pipeline that guarantees that all events generated by an Elasticsearch query have been processed and the full index has successfully been written, and then signal completion to the other program? E.g., is there a way to write a "done" record to the file?
I am stuck. As a Logstash-newbie I could not find an obvious way to achieve this. Any suggestions?
You could use the pipeline stats API to check the throughput on the output and start processing when that settles at zero. The API returns the number of events processed, not the throughput, so you would need to compare the values from multiple API calls.
Thanks for your suggestion, Badger. I am not sure, the pipeline stats API would work for us in case of a single run of the export container, because we can not get the stats from Logstash when the image is done or aborted.
However, your suggestion made me think about the Elasticsearch _count-API.
I failed to call the count API via the Elasticsearch input plugin, since this seems only to support the search-API. But I can call it with the http-Filter. So here is my provisional approach. Pipeline:
input {
# single event that triggers getting the index size
generator {
message => [ "dummy" ]
tags => [ "indexsize" ]
count => 1
}
# full index query
elasticsearch {
...
}
...
}
filter {
if "indexsize" in [tags] {
http {
url => 'http://elasticsearch:9200/myindex/_count'
verb => "GET"
target_body => "body"
body_format => "json"
}
json {
source => "body"
}
mutate {
rename => { "[body][count]" => "[count]" }
# remove http fields
remove_field => ["body", "headers", "sequence", "host","message"]
}
}
...
}
output {
if "indexsize" in [tags] {
file {
path => "myindex-expectedsize.txt"
# write only the size of the index
codec => line { format => "%{count}"}
}
} else {
# export index events to index-file
}
...
}
This did create a file that contains the number of elements of the index.
Now, before processing the index-file we can check - e.g. using fstat - if the index-file is still open. If not, we can compare the number of lines in the index-file with the number of elements given in "myindex-expectedsize.json". If there is any difference, we err on the side of caution and report an error. In a shell script:
if [[ "$(wc -l myindex.json | cut -d ' ' -f1)" -eq "$(cat myindex-expectedsize.txt)" ]]; then
echo "error: mismatch between actual and expected number of entries"
else
# process myindex.json
fi
Hope, there not any problems associated with this approach? I'd appreciate suggestions for improvement or better approaches.
Apache, Apache Lucene, Apache Hadoop, Hadoop, HDFS and the yellow elephant
logo are trademarks of the
Apache Software Foundation
in the United States and/or other countries.