Simultaneously executing multiple queries on Scroll API to fetch Large Data

How to fetch large data in parallel from elastic search?

Using Scroll API I am able to fetch complete data from elasticsearch. But it is too slow. Takes around 40 seconds to fetch 500000 records.

I am trying to use multiprocessing module in python to fetch data from scroll API simultaneously...

Here are the details of the code:

elastic_url = 'http://0.0.0.0/'+index_name+'/_search?scroll=20m'
sroll_api_url = 'http://0.0.0.0/_search/scroll'
r1=requests.request(
      "POST",
       elastic_url,
       data=json.dumps(query_body),
       headers=headers
      )


process=[]
##create 5 processes and call the fetch_data function and start the processes

for _ in range(5):
    p1=multiprocessing.Process(target=fetch_data_scroll_id,args=([scroll_payload],))
    p1.start()
    process.append(p1)
## p.join() is used so that each process will wait if they finished off early.
for p in process:
    p.join()

I get the following error :
{
    "_scroll_id": "DnF1ZXJ5VGhlbkXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXc3UXJHUy1PNUdvQ2lyencAAAAAACCS6BY0bXdKN1lHN1FyR1MtTzVHb0Npcnp3AAAAAAAgkuYWNG13SjdZRzdRckdTLU81R29DaXJ6dwAAAAAAIJLpFjRtd0o3WUc3UXJHUy1PNUdvQ2lyencAAAAAACCS5xY0bXdKN1lHN1FyR1MtTzVHb0Npcnp3",
    "_shards": {
        "failures": [
            {
                "index": null,
                "shard": -1,
                "reason": {
                    "type": "search_context_missing_exception",
                    "reason": "No search context found for id [2134760]"
                }
            },
            {
                "index": "logstash-2020.07.07",
                "reason": {
                    "type": "search_context_missing_exception",
                    "reason": "No search context found for id [2134757]"
                },
                "shard": 0,
                "node": "XXXXXXXXXXXXXXXXXXX"
            }
        ],
        "total": 5,
        "successful": 3,
        "skipped": 0,
        "failed": 2
    },
    "timed_out": false,
    "took": 17,
    "hits": {
        "total": 106157,
        "max_score": null,
        "hits": []
    },
    "terminated_early": true
}

Please help.....

@N220
You need to use sliced scroll. You can process multiple slices simultaneously. https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-body.html#sliced-scroll

Thanks Vinayak! ... will check it out

Hi Vinayak ,
I am planning to implement it this way..:
There are 5 shards so I will create id's 0-4 and get the 4 scroll ID's and then using multiprocessing fetch the data.

Is that how it needs to be implemented?

Correct.

But I am still facing the same issue:

{
    "timed_out": false,
    "took": 1,
    "terminated_early": true,
    "hits": {
        "total": 105216,
        "hits": [],
        "max_score": null
    },
    "_scroll_id": "DnF1ZXJ5VGhlbkZldGNoBQAAAAAAIJyxFjRtd0o3WUc3UXJHUy1PNUdvQ2lyencAAAAAACCcsBY0bXdKN1lHN1FyR1MtTzVHb0Npcnp3AAAAAAAgnLIWNG13SjdZRzdRckdTLU81R29DaXJ6dwAAAAAAIJy0FjRtd0o3WUc3UXJHUy1PNUdvQ2lyencAAAAAACCcsxY0bXdKN1lHN1FyR1MtTzVHb0Npcnp3",
    "_shards": {
        "total": 5,
        "skipped": 0,
        "successful": 5,
        "failed": 0
    }
}

The the requests are being terminated early on. Any idea on this?

Hi Vinayak,
I have resolved the issue. Now I am able to execute the queries in parallel. But I don't see any improvement in performance.
Here is my code:

max_slice =5
#I have created 5 process and there 5 scroll_id .
        process=[]
        for i in range(max_slice):
            p1=multiprocessing.Process(target=fetch_data_scroll_id,args=([scroll_payload_list[i%max_slice]],))
            p1.start()
            process.append(p1)
        for p in process:
            p.join()

To fetch 5 lakh records it used to take around 30 seconds.Still it takes around 26 seconds .
Please give any suggestions to improve the performance.

Are you seeing exception? Are you getting hits in the first iteration or subsequent iterations while scrolling a single slice? What version of ES are you using.

early termination is not a failure. It just means whole query was not executed?

if possible post query_body.

Have you tried to identify whether there is any resource limiting performance? What is the hardware specification of your cluster? What does disk I/O and iowait look like on the data nodes while you are extracting? Do you see any evidence in the logs of long or frequent GC?

Hi Vinayak,
There are no exceptions. I am getting hits.
Here are the details:

I think the records are being fetched but not able to understand why there is no improvement in performance

{
    "query": {
        "bool": {
            "must": [
                {
                    "match_phrase": {
                        "type": "lab_name"
                    }
                }
            ]
        }
    },
    "sort": "_doc",
    "size": 10000,
    "slice": {
        "id": 4,
        "max": 5
    }
}

<Process(Process-1, started)>
process id: 4245
<Process(Process-2, started)>
process id: 4246
<Process(Process-3, started)>
process id: 4247
<Process(Process-4, started)>
process id: 4248
<Process(Process-5, started)>
process id: 4249
{
    "terminated_early": true,
    "_scroll_id": "DnF1ZXJ5VGhlbkZldGNoBQAAAAAAIKGcFjRtd0o3WUc3UXJHUy1PNUdvQ2lyencAAAAAACChmxYXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX9DaXJ6dwAAAAAAIKGdFjRtd0o3WUc3UXJHUy1PNUdvQ2lyencAAAAAACChnxY0bXdKN1lHN1FyR1MtTzVHb0Npcnp3",
    "timed_out": false,
    "took": 40,
    "_shards": {
        "skipped": 0,
        "failed": 0,
        "successful": 5,
        "total": 5
    },
    "hits": {
        "max_score": null,
        "total": 105216,
        "hits": []
    }
}
95216 length of records fetched
{
    "terminated_early": true,
    "_scroll_id": "DnF1ZXJ5VGhlbkZldGNoBQAAAAAAIKGgFjRtd0o3WUc3UXJHUy1PNUdvQ2lyencAAAAAACChoRY0bXdXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXhpBY0bXdKN1lHN1FyR1MtTzVHb0Npcnp3",
    "timed_out": false,
    "took": 1,
    "_shards": {
        "skipped": 0,
        "failed": 0,
        "successful": 5,
        "total": 5
    },
    "hits": {
        "max_score": null,
        "total": 105518,
        "hits": []
    }
}
95518 length of records fetched
{
    "terminated_early": true,
    "_scroll_id": "DnF1ZXJ5VGhlbkZldGNoBQAAAAAAIKGWFjRtd0o3WUc3UXJHUy1PNUdvQ2lyencAAAAAACChlxY0bXdKN1lHN1FyR1MtTzVHb0Npcnp3AAAAAAAgoZgWNG13SjdZRzdRckdXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXhmhY0bXdKN1lHN1FyR1MtTzVHb0Npcnp3",
    "timed_out": false,
    "took": 1,
    "_shards": {
        "skipped": 0,
        "failed": 0,
        "successful": 5,
        "total": 5
    },
    "hits": {
        "max_score": null,
        "total": 106157,
        "hits": []
    }
}
96157 length of records fetched
{
    "terminated_early": true,
    "_scroll_id": "DnF1ZXJ5VGhlbkZldGNoBQAAAAAAIKGmFjRtd0o3WUc3UXJHUy1PNUdvQ2lyencAAAAAACChpRY0bXdKN1lHN1FyR1MtTzVHb0Npcnp3AAAAAAAgoakWNXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXncAAAAAACChqBY0bXdKN1lHN1FyR1MtTzVHb0Npcnp3",
    "timed_out": false,
    "took": 1,
    "_shards": {
        "skipped": 0,
        "failed": 0,
        "successful": 5,
        "total": 5
    },
    "hits": {
        "max_score": null,
        "total": 106178,
        "hits": []
    }
}
96178 length of records fetched
{
    "terminated_early": true,
    "_scroll_id": "DnF1ZXJ5VGhlbkZldGNoBQAAAAAAIKGrFjRtd0o3WUc3UXJHUy1PNUdvQ2lyencAAAAXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXR29DaXJ6dwAAAAAAIKGuFjRtd0o3WUc3UXJHUy1PNUdvQ2lyencAAAAAACChrxY0bXdKN1lHN1FyR1MtTzVHb0Npcnp3",
    "timed_out": false,
    "took": 1,
    "_shards": {
        "skipped": 0,
        "failed": 0,
        "successful": 5,
        "total": 5
    },
    "hits": {
        "max_score": null,
        "total": 106028,
        "hits": []
    }
}
96028 length of records fetched
Total processes:5 
Time taken to fetch data 23.921



Hi Christian_Dahlqvist,

I don't know how to check :

1.hardware specification of your cluster
Can you give me more info on the commands that I have to use.....

Thanks !
```````````````````````````

What is the specification of the host(s) the cluster is running on? Whoever set up the cluster should know this.

Hi Vinayak,
Any suggestions?

If you have some resource bottleneck in the cluster, e.g. disk performance, trying to do more in parallel may not result in any improvement. I therefore recommend having a look at resource utilization as described earlier.

Hi Christian_Dahlqvist,
Thanks for the response. I will surely look into it. True I did experiment with increase number of processes but there was no improvement. Any idea where I can know more about those commands?

You need to share information about the cluster hardware and use e.g. iostat to get information about disk utilization. Also look in logs for signs of long and/or frequent GC. You can also use top to look at CPU usage.

Without this information it is hard to help.

Hi Christian_Dahlqvist,
Sure . I will inform the admin to investigate the logs for unusual I/O occurrence.

Thanks again!

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.