Is there a way to run parallel_bulk with sleep time?


(Moshe Sucaz) #1

Hi,
I am upgrading our elastic from 1.5.2 to 2.3.4. I wrote python re index script that using elasticsearch.helpers.parallel_bulk, and also I keep indexing new incoming docs to 1.5.2 and 2.3.4 clusters. When I am running the script, the new incoming docs are not indexed and the queue get bigger and bigger. I guess its because parallel_bulk is running without a break...
Is there a way to run parallel_bulk with sleep time once in a while, or something like that?
I tried to put sleep in the for loop of the call to parallel_bulk, but it didn't help

The function that I wrote to parallel_bulk is:

def parallel_reindex(index_name, doc_type, chunk_size=500, scroll='10m', scan_kwargs={}, bulk_kwargs={}):
target_client = Elasticsearch(hosts = ['node01:9200', 'node02:9200', 'node03:9200'], retry_on_timeout = True, max_retries = 10, timeout = 1000)
source_client = Elasticsearch(hosts = ['node04:9200', 'node05:9200', 'node06:9200'], retry_on_timeout=True, max_retries=10, timeout=1000)
query = {"query": {"match_all": {}}}
docs = scan(source_client,
query = query,
index = index_name,
scroll = scroll,
doc_type=doc_type,
** scan_kwargs
)

def _change_doc_params_to_elastic_2(hits, target_client):
    health = target_client.cluster.health()
    while (health.get("status") != "green"):
        print("waiting 10 min for cluster to be green")
        time.sleep(600)
    # reindex_log.info("cluster health is %s" % health)
    for h in hits:
        # changing dots to “_”
        if 'x.y.z' in h['_source']:
            h['_source']['x_y_z'] = h['_source']['x.y.z']
            del h['_source']['x.y.z']
        # removing _analyzer
        if '_analyzer' in h['_source']:
            del h['_source']['_analyzer']
        yield h
kwargs = {
    'stats_only': True,
}
kwargs.update(bulk_kwargs)
for response in parallel_bulk(target_client, _change_doc_params_to_elastic_2(docs, target_client), thread_count=8, chunk_size=chunk_size, max_chunk_bytes=20 * 1014 * 1024):
  #time.sleep(1)  
pass
print("Done parallel_reindex of doc_type %s in index %s" % (doc_type,index_name))

(system) #2