parallel bulk is a generator, meaning it is lazy and won't produce any results until you start consuming them. The proper way to use it is:
for success, info in parallel_bulk(...):
if not success:
print('A document failed:', info)
If you don't care about the results (which by default you don't have to since any error will cause an exception) you can use the consume function from itertools recipes (https://docs.python.org/2/library/itertools.html#recipes):
from collections import deque
deque(parallel_bulk(...), maxlen=0)
Btw the reason it is lazy is so that you never have to materialize a list with all the records, which can be potentially very expensive (for example when inserting data from a DB, long file or doing a reindex). It also means that you don't have to pass in a list, you can pass in a generator, thus avoiding creating a huge in-memory list yourself. In your example you could have a generator function:
def genereate_actions(data):
for i in range(len(data)):
source_dict = {}
row = data.iloc[i]
for k in header:
source_dict[k] = str(row[k])
yield {
'_op_type': 'index',
'_index': index_name,
'_type': doc_type,
'_source': source_dict
}
and then call:
for success, info in parallel_bulk(es, genereate_actions(data), ...):
if not success: print('Doc failed', info)
which will avoid the need to have all the documents present in memory at any given time. When working with larger dataset it can be a significant memory saving!
I am following your recommendation to index a very large file which takes hours. However, along the way, the memory foot print increases until it starts swapping. Internally, I am not saving anything and memory profile shows that it has something to do with parallel_bulk, but I cannot get deeper than that. Am I missing something here? Does this makes any sense to you at all? I am using Python 2.7.5 with elasticsearch 2.3.0
Apache, Apache Lucene, Apache Hadoop, Hadoop, HDFS and the yellow elephant
logo are trademarks of the
Apache Software Foundation
in the United States and/or other countries.