Handling errors during bulk indexing

(Oskar Persson) #1

I'm trying to setup a parsing system for a number of different files of different formats (mostly XML). The content of these files are parsed (in parallel) with data partially stored in a relational database and the rest in Elasticsearch. Each entry in the relational database has an id that matches the same document in Elasticsearch.

Here is some simplified Python/Django-code describing my current solution:

def parse_xml(xmlfile):
    for element in xmlfile:
        id = generate_id()
        db_entry = create_db_entry(element, id)
        es_doc = create_es_document(element, id)

        yield db_entry, es_doc

db_entries, es_docs = zip(*parse_xml(xmlfile))

with transaction.atomic():
    XMLElement.objects.bulk_create(db_entries, batch_size=100)

for ok, result in es_helpers.streaming_bulk(es_conn, es_docs, chunk_size=100):
        action, result = result.popitem()
        doc_id = result['_id']
        if not ok:
            print('Failed to index document %s' % (doc_id,))

So, in summary:

  1. I parse the XML elements
  2. Store them, using a single bulk request, in the relational database
  3. Store them, using a single bulk request, in Elasticsearch

Now this works great as long as no error occurs in the middle of saving to Elasticsearch. If, for example, the computer crashes and I want to resume the indexing I have no way to find out which elements has already been parsed and which has not. I have no unique data for each element in the XML that I can use to find out what has already been indexed or not.

What is a good solution for this?

My current idea is to store the filename with each entry so that I can simply delete all already saved entries on retry and start from the beginning with that file. Though the problem I see with that is when huge files needs to be parsed in an unstable environment where errors of different kinds (mostly connection problems with Elasticsearch-server) is not uncommon.

For the relational database I'm wrapping the queries in a transaction which solves the problem with a rollback if an error occurs. From what I've seen a similar solution is not available in Elasticsearch.

Maybe my whole solution is wrong, I'm open for suggestions of any changes :slight_smile:

(system) closed #2

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.