How to scroll through an Elasticsearch index using elasticsearch-spark?

With the Java Client.prepareSearch() and Client.prepareSearchScroll() APIs, we can query an Elasticsearch index using the scrolls as mentioned in the documentation. With these APIs, we can select only a specific number of hits per request by setting SearchRequestBuilder.setSize() . The SearchResponse provides the scroll Id, which is then used in the subsequent request.

How can one use elasticsearch-spark to implement a similar functionality ? All JavaEsSpark.esRDD() methods return JavaPairRDD , which would contain all hits. Is there a way to request only a specific number of hits per request and then continue scrolling with further request?

I found the configuration es.scroll.size , which seems equivalent to SearchRequestBuilder.setSize() but I am not sure how to use it and how the scroll ids would be used in the context of elasticsearch-spark?

ES-Hadoop uses the scroll endpoint to collect all the data for processing within Spark. ES-Hadoop performs the multiple scroll requests under the hood on its own, requesting the next scroll entry after the data in the current scroll response is fully consumed. I'm not sure I understand what you're looking for in terms of advancing the scroll request on your own. Could you elaborate on your use case?

1 Like

Thanks @james.baiera for your reply. The Elasticsearch documents in our indices have a field that stores a list of objects. I want to be able to fetch the documents from ES using a query, access the list of objects from above-mentioned field. And then apply certain filters on that list of objects. I also want to stop the execution once I get intended number of objects from the fetched documents. Because of certain restrictions related data storage formats, I can't apply the filters on the ES side.

For this, I could use Client.prepareSearch() and Client.prepareSearchScroll() APIs to fetch a specific number of documents in memory per scroll and apply filters on the list field. I can continue scrolling until the expected number of objects are extracted.

I am not sure how a similar functionality can be implemented with elasticsearch-spark ?

You could always access the documents from Elasticsearch, then extract the list items into their own records with a flatmap operation. Then once your filters are applied to the RDD/Dataset, tell Spark that you want to take X number of results from the RDD.

If you have a substantial number of records to go through, asking Spark to take a number of results may still compute the full result set. You can try to limit the total number of records returned from ES-hadoop to Spark by setting es.scroll.limit, which will tell the connector to preemptively stop reading results from a scroll request once it has read that many documents. You might need to play with the value you set in there so that you always get your minimum number of documents.

1 Like

Thank you. I will experiment with the es.scroll.limit config.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.