Helpers.parallel_bulk in Python not working?

Hi,

I'm trying to test out the parallel_bulk functionality in the python client for elasticsearch and I can't seem to get helpers.parallel_bulk to work.

For example, using the regular helpers.bulk works:

bulk_data = []
header = data.columns
for i in range(len(data)):
    source_dict = {}
    row = data.iloc[i]
    for k in header:
        source_dict[k] = str(row[k])
    data_dict = {
        '_op_type': 'index',
        '_index': index_name,
        '_type': doc_type,
        '_source': source_dict
    }
    bulk_data.append(data_dict)

es.indices.create(index=index_name, body=settings, ignore=404)
helpers.bulk(client=es, actions=bulk_data)
es.indices.refresh()
es.count(index=index_name)

{'_shards': {'failed': 0, 'successful': 5, 'total': 5}, 'count': 13979}

But replacing it with helpers.parallel_bulk doesn't seem to index anything:

bulk_data = []
header = data.columns
for i in range(len(data)):
    source_dict = {}
    row = data.iloc[i]
    for k in header:
        source_dict[k] = str(row[k])
    data_dict = {
        '_op_type': 'index',
        '_index': index_name,
        '_type': doc_type,
        '_source': source_dict
    }
    bulk_data.append(data_dict)
es.indices.create(index=index_name, body=settings, ignore=404)

helpers.parallel_bulk(client=es, actions=bulk_data, thread_count=4)
es.indices.refresh()
es.count(index=index_name)

{'_shards': {'failed': 0, 'successful': 5, 'total': 5}, 'count': 0}

Am I missing something? I'm on elasticsearch 2.1.1 with elasticsearch-py 2.1.0.

2 Likes

Hi,

parallel bulk is a generator, meaning it is lazy and won't produce any results until you start consuming them. The proper way to use it is:

for success, info in parallel_bulk(...):
    if not success:
        print('A document failed:', info)

If you don't care about the results (which by default you don't have to since any error will cause an exception) you can use the consume function from itertools recipes (https://docs.python.org/2/library/itertools.html#recipes):

from collections import deque
deque(parallel_bulk(...), maxlen=0)

Hope this helps.

8 Likes

Btw the reason it is lazy is so that you never have to materialize a list with all the records, which can be potentially very expensive (for example when inserting data from a DB, long file or doing a reindex). It also means that you don't have to pass in a list, you can pass in a generator, thus avoiding creating a huge in-memory list yourself. In your example you could have a generator function:

def genereate_actions(data):
    for i in range(len(data)):
        source_dict = {}
        row = data.iloc[i]
        for k in header:
            source_dict[k] = str(row[k])
        yield {
            '_op_type': 'index',
            '_index': index_name,
            '_type': doc_type,
            '_source': source_dict
        }

and then call:

for success, info in parallel_bulk(es, genereate_actions(data), ...):
    if not success: print('Doc failed', info)

which will avoid the need to have all the documents present in memory at any given time. When working with larger dataset it can be a significant memory saving!

4 Likes

Thanks, I was having same issue. Not sure why this isn't called out in the docs.

I am following your recommendation to index a very large file which takes hours. However, along the way, the memory foot print increases until it starts swapping. Internally, I am not saving anything and memory profile shows that it has something to do with parallel_bulk, but I cannot get deeper than that. Am I missing something here? Does this makes any sense to you at all? I am using Python 2.7.5 with elasticsearch 2.3.0

Thank you for your help

This DEFINITELY needs to be in the documentation!!!!!

4 Likes