How to use elasticsearch sliced scroll with multiprocessing in python?

I'm pulling data from elastic search using python client scroll id and appending in a dataframe as follows

import pandas as pd
from elasticsearch import Elasticsearch
es = Elasticsearch([{'host': 'localhost', 'port': 9200}])
index_columns = ['a','b','c',...............]
message_body = {"size": 1000, "_source": index_columns, "query": {"match_all": {}}}
elastic_data = es.search(index="data", body=message_body, scroll='1m')
at_data = pd.DataFrame([a['_source'] for a in elastic_data['hits']['hits']])
sid = elastic_data['_scroll_id']
scroll_size = len(elastic_data['hits']['hits'])

while scroll_size > 0:
    elastic_data_rest = es.scroll(scroll_id=sid, scroll='1m')
    at_data_rest = pd.DataFrame([a['_source'] for a in elastic_data_rest['hits']['hits']])
    sid = elastic_data_rest['_scroll_id']
    scroll_size = len(elastic_data_rest['hits']['hits'])
    at_data = at_data.append(at_data_rest, ignore_index=True, sort=False)

above works good, but taking long time for big data

May I know whether sliced scroll with pool helps to pull faster or any other way available?

I gone through this
#817
and
https://www.codestudyblog.com/cnb2010/1006124017.html

and tried some, but no luck
How to use elasticsearch sliced scroll with multithreading in python? · Issue #1527 · elastic/elasticsearch-dsl-py (github.com)

Thanks

below method running indefinitely

NJOBS = 4
def es_scroll(index, slice_no):
    message_body = {"slice": {"id": slice_no,"max": NJOBS},
                    "size": 1000,
                    "_source": index_columns,
                    "query": {"match_all": {}}}
    elastic_data = es.search(index=lc_training_ID, body=message_body, scroll='1m')
    at_data = pd.DataFrame([a['_source'] for a in elastic_data['hits']['hits']])
    sid = elastic_data['_scroll_id']
    scroll_size = len(elastic_data['hits']['hits'])

    while scroll_size > 0:
        elastic_data_rest = es.scroll(scroll_id = sid, scroll = '1m', request_timeout = 30)
        at_data_rest = pd.DataFrame([a['_source'] for a in elastic_data_rest['hits']['hits']])
        at_data = at_data.append(at_data_rest, ignore_index=True, sort=False)      
        sid = elastic_data['_scroll_id']
        scroll_size = len(elastic_data['hits']['hits'])
    gc.collect() 
    es.clear_scroll(body={'scroll_id': sid})
    return df  

def build_parameters(index):
    parameters =[]
    for num in range(0, NJOBS): 
        tuple_parameter = (index, num)
        parameters.append(tuple_parameter)
    return parameters

parameters = build_parameters("data")

with multiprocessing.Pool(processes = NJOBS) as pool:
    result = pool.starmap(es_scroll, parameters)
frame = pd.concat(result, ignore_index=True, sort = False)

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.