Hi,
I am upgrading our elastic from 1.5.2 to 2.3.4. I wrote python re index script that using elasticsearch.helpers.parallel_bulk, and also I keep indexing new incoming docs to 1.5.2 and 2.3.4 clusters. When I am running the script, the new incoming docs are not indexed and the queue get bigger and bigger. I guess its because parallel_bulk is running without a break...
Is there a way to run parallel_bulk with sleep time once in a while, or something like that?
I tried to put sleep in the for loop of the call to parallel_bulk, but it didn't help
The function that I wrote to parallel_bulk is:
def parallel_reindex(index_name, doc_type, chunk_size=500, scroll='10m', scan_kwargs={}, bulk_kwargs={}):
target_client = Elasticsearch(hosts = ['node01:9200', 'node02:9200', 'node03:9200'], retry_on_timeout = True, max_retries = 10, timeout = 1000)
source_client = Elasticsearch(hosts = ['node04:9200', 'node05:9200', 'node06:9200'], retry_on_timeout=True, max_retries=10, timeout=1000)
query = {"query": {"match_all": {}}}
docs = scan(source_client,
query = query,
index = index_name,
scroll = scroll,
doc_type=doc_type,
** scan_kwargs
)
def _change_doc_params_to_elastic_2(hits, target_client): health = target_client.cluster.health() while (health.get("status") != "green"): print("waiting 10 min for cluster to be green") time.sleep(600) # reindex_log.info("cluster health is %s" % health) for h in hits: # changing dots to “_” if 'x.y.z' in h['_source']: h['_source']['x_y_z'] = h['_source']['x.y.z'] del h['_source']['x.y.z'] # removing _analyzer if '_analyzer' in h['_source']: del h['_source']['_analyzer'] yield h kwargs = { 'stats_only': True, } kwargs.update(bulk_kwargs) for response in parallel_bulk(target_client, _change_doc_params_to_elastic_2(docs, target_client), thread_count=8, chunk_size=chunk_size, max_chunk_bytes=20 * 1014 * 1024): #time.sleep(1) pass
print("Done parallel_reindex of doc_type %s in index %s" % (doc_type,index_name))