I'm trying to setup a parsing system for a number of different files of different formats (mostly XML). The content of these files are parsed (in parallel) with data partially stored in a relational database and the rest in Elasticsearch. Each entry in the relational database has an id that matches the same document in Elasticsearch.
Here is some simplified Python/Django-code describing my current solution:
def parse_xml(xmlfile):
for element in xmlfile:
id = generate_id()
db_entry = create_db_entry(element, id)
es_doc = create_es_document(element, id)
yield db_entry, es_doc
db_entries, es_docs = zip(*parse_xml(xmlfile))
with transaction.atomic():
XMLElement.objects.bulk_create(db_entries, batch_size=100)
for ok, result in es_helpers.streaming_bulk(es_conn, es_docs, chunk_size=100):
action, result = result.popitem()
doc_id = result['_id']
if not ok:
print('Failed to index document %s' % (doc_id,))
So, in summary:
- I parse the XML elements
- Store them, using a single bulk request, in the relational database
- Store them, using a single bulk request, in Elasticsearch
Now this works great as long as no error occurs in the middle of saving to Elasticsearch. If, for example, the computer crashes and I want to resume the indexing I have no way to find out which elements has already been parsed and which has not. I have no unique data for each element in the XML that I can use to find out what has already been indexed or not.
What is a good solution for this?
My current idea is to store the filename with each entry so that I can simply delete all already saved entries on retry and start from the beginning with that file. Though the problem I see with that is when huge files needs to be parsed in an unstable environment where errors of different kinds (mostly connection problems with Elasticsearch-server) is not uncommon.
For the relational database I'm wrapping the queries in a transaction which solves the problem with a rollback if an error occurs. From what I've seen a similar solution is not available in Elasticsearch.
Maybe my whole solution is wrong, I'm open for suggestions of any changes