How to create parallel cursors on my dataset in Elastic search


(Anubhav) #1

I have 30,000 instances in elastic search. If I used the scroll API in while loop with the below settings:

  1. with " search context alive": 50m and "size":2
  2. with " search context alive": 5m and "size":100
    Both ways I cannot get access to multiple size windows on instances. With this single scroll getting all documents out will take up to 20 days for me and there many other instances to parse.

I had similar problem in past where I was dealing dataset within mongoDB. But, opening different processes were quite easy for me. For Example.

   n_cores = 30        
    collection_size = collectionTolookfor.count()
    batch_size = round(collection_size/n_cores+0.5)
    skips = range(0, n_cores*batch_size, batch_size)
    # hitting service on this http://130.20.47.179:8012 server
    processes = [ multiprocessing.Process(target=FTExtraction.entry, args=(full_Text_articles_English , skip_n, batch_size)) for skip_n in skips]
    for process in processes:
        process.start()
    for process in processes:
        process.join()

Similar approach I tried with "FROM+SIZE" clauses and not using "SCROLL" in Elastic search. For Eg.

fromPtrPool = range(0, 30000+1,6000)
processes = [ Process(target=create_es_pickle, args=(datalake_url, elastic_search_url, ack_year_month , fromPtr, 6000)) for fromPtr in fromPtrPool]
for process in processes:
    process.start()
for process in processes:
    process.join()

The above multiprocessing code requests using below code:

def create_es_pickle(--with all arguments--):
                 resp = requests.post(elastic_search_url + \
                             '/data/_search?from={}'.format(fromPtr), json=query).json()

This gives following error in the response object:

 error': {'root_cause': [{'type': 'query_phase_execution_exception', 'reason': 'Result window is too large, from + size must be less than or equal to: [10000] but was [30000]. See the scroll api for a more efficient way to request large data sets. This limit can be set by changing the [index.max_result_window] index level setting.'}], 'type': 'search_phase_execution_exception', 'reason': 'all shards failed', 'phase': 'query', 'grouped': True, 'failed_shards': [{'shard': 0, 'index': 'data', 'node': 'jt018XEgT6aIjIbXCZfZdg', 'reason': {'type': 'query_phase_execution_exception', 'reason': 'Result window is too large, from + size must be less than or equal to: [10000] but was [30000]. See the scroll api for a more efficient way to request large data sets. This limit can be set by changing the [index.max_result_window] index level setting.'}}], 'caused_by': {'type': 'query_phase_execution_exception', 'reason': 'Result window is too large, from + size must be less than or equal to: [10000] but was [30000]. See the scroll api for a more efficient way to request large data sets. This limit can be set by changing the [index.max_result_window] index level setting.'}}, 'status': 500}

All 6 processes were terminated instantly but 1 which continues with the execution of bringing the records down.

I am a newbie with ES and would need suggestion on How I can open multiple connections/ cursor/scrollers that chunks out my big-dataset according to above scheme.
please suggest.


(system) #2

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.