Retrieving over a million records in Elasticsearch

Hello,

I believe this is a problem widely talked about in the Elasticsearch community and scan/scroll is a useful method for this problem. However, I am unable to get my targeted results.

Problem: I want to be able to return ~1 million records from Elasticsearch under a target latency of 1 second (Elasticsearch + network latency).

Problem Parameters:
Elasticsearch version: 1.7

Query I am running:

{
    "fields": [
        "_id"
    ],
    "filter": {
        "bool": {
            "should": [
                {
                    "term": {
                        "foo": "bar"
                    }
                },
                {
                    "term": {
                        "baz": [
                            "qux"
                        ]
                    }
                }
            ]
        }
    }
}

Note: I only require the _id fields of the documents.

My Elasticsearch cluster contains 6 nodes (all are master & data nodes) with 4 cores, 32 GB of RAM. The index itself has 5 shards and 2 sets of replicas.

What I have tried:

  1. Setting the 'size' parameter to get all the records. The latency Elasticsearch gives me in the 'took' field gives me 10 seconds and it takes 6 seconds for the data to be serialized, transferred over the network and deserialized on the client end. This is obviously unacceptable.

  2. Using scan/scroll method. This is something that is widely talked about so I decided to try it over different batch sizes. I tried it over 10, 100, 1000, 10,000, 100,000 and 1,000,000 batch sizes. It took way too long with a size of 10, 100 and 1000. It was about 19-20 seconds with 100,000 and 1,000,000 batch size which is significantly worse than retrieving all the documents at once with the regular query.

  3. I also tried using different 'search_type' in the parameter hoping that 'query_and_fetch' would perform better. However, that doesn't seem to give any performance gains.

However, when I run a 'count' query, the result is super quick, ~10 milliseconds. This indicates, that the obvious bottleneck is in the retrieval of the documents from the gateway node to the corresponding shards. Since the query is a filter query, there should be no sorting at all during retrieval or search. I guess the underlying question is, is there any way I can get this result in under a second? My hosts are over a 1 Gigabit connection, so as long as Elasticsearch can aggregate the results quick enough, I should be good to go?

I have a couple of potential ideas for this.

  1. I believe filters in Elasticsearch use a bitset to represent the matching documents and aggressively caches these bitsets. It would be really useful if I could get access to the bitset and somehow get the mapping to the document _ids. Would it be possible to write a plug-in to get this data directly?

  2. Retrieve the data directly without serializing to JSON in Elasticsearch via a custom plug-in maybe in a binary format?

I do understand Elasticsearch is not traditionally meant to return a million records in under a second(no search engine really optimizes for this in reality, atleast I don't think so). But, it would be a great fit for a project and if I can get this to work, it'll be perfect. Appreciate any help. If there is any information I have forgotten to provide, please let me know.

At this point you are better of making 3 of them master eligible and data nodes and the other 3 just data nodes.

Or the scoring. You should see if it gets faster if you sort by _doc.

It could also be fetching the _ids.

You should use the hot_threads API to see what is taking the time.

The bitsets aren't of _ids. They are at the Lucene segment level and _id is a thing Elasticsearch is inserting on top of that. Depending on your query it may not even use the cache - if it needs scores it won't. If it is super fast without the cache (term query) then it'll skip it as well.

What do you want to do with the results? Elasticsearch's aggregates were built to do interesting things with portions of the documents after apply arbitrary filters. You might have a similar problem. I mean, maybe its one that can be solved with an aggregation. Or maybe it is one that we just need to better understand.

Does this actually boost performance? I'm not in a production scenario at the moment. There isn't any load on the cluster as such.

I used a filter query, which not be doing any sorting, correct? Also, what does sorting by _doc achieve?

I used the hot_threads API while executing the query and it seems like it is the fetch_phase from the gateway node. I sampled every second to see what threads were taking time. Here is the output:

100.3% (1s out of 1s) cpu usage by thread 'name_of_elasticsearch_cluster_node][search][T#3]' 4/10 snapshots
sharing following 22 elements sun.nio.ch.NativeThread.current(Native Method)
sun.nio.ch.NativeThreadSet.add(NativeThreadSet.java:46)
sun.nio.ch.FileChannelImpl.readInternal(FileChannelImpl.java:737)
sun.nio.ch.FileChannelImpl.read(FileChannelImpl.java:727)
org.apache.lucene.store.NIOFSDirectory$NIOFSIndexInput.readInternal(NIOFSDirectory.java:179)
org.apache.lucene.store.BufferedIndexInput.refill(BufferedIndexInput.java:342)
org.apache.lucene.store.BufferedIndexInput.readByte(BufferedIndexInput.java:54)
org.apache.lucene.store.DataInput.readVInt(DataInput.java:122)
org.apache.lucene.store.BufferedIndexInput.readVInt(BufferedIndexInput.java:221)
org.apache.lucene.codecs.compressing.CompressingStoredFieldsReader.visitDocument(CompressingStoredFieldsRe
der.java:249) org.apache.lucene.index.SegmentReader.document(SegmentReader.java:335)
org.elasticsearch.search.fetch.FetchPhase.loadStoredFields(FetchPhase.java:427)
org.elasticsearch.search.fetch.FetchPhase.createSearchHit(FetchPhase.java:219)
org.elasticsearch.search.fetch.FetchPhase.execute(FetchPhase.java:184)
org.elasticsearch.search.SearchService.executeFetchPhase(SearchService.java:401)
org.elasticsearch.search.action.SearchServiceTransportAction$SearchQueryFetchTransportHandler.messageReceived(SearchServiceTransportAction.java:833)
org.elasticsearch.search.action.SearchServiceTransportAction$SearchQueryFetchTransportHandler.messageReceived(SearchServiceTransportAction.java:824)
org.elasticsearch.transport.netty.MessageChannelHandler$RequestHandler.doRun(MessageChannelHandler.java:279) org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:36)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 

This was repeated across multiple threads on the gateway node. On one of the other nodes, this was the output:

99.7% (997.3ms out of 1s) cpu usage by thread 'elasticsearch[other_node_on_cluster][search][T#7]' 3/10 snapshots
sharing following 22 elements sun.nio.ch.NativeThread.current(Native Method)
sun.nio.ch.NativeThreadSet.add(NativeThreadSet.java:46)
sun.nio.ch.FileChannelImpl.readInternal(FileChannelImpl.java:737)
sun.nio.ch.FileChannelImpl.read(FileChannelImpl.java:727)
org.apache.lucene.store.NIOFSDirectory$NIOFSIndexInput.readInternal(NIOFSDirectory.java:179)
org.apache.lucene.store.BufferedIndexInput.refill(BufferedIndexInput.java:342)
org.apache.lucene.store.BufferedIndexInput.readByte(BufferedIndexInput.java:54)
org.apache.lucene.codecs.compressing.LZ4.decompress(LZ4.java:110)
org.apache.lucene.codecs.compressing.CompressionMode$4.decompress(CompressionMode.java:135)
org.apache.lucene.codecs.compressing.CompressingStoredFieldsReader.visitDocument(CompressingStoredFieldsReader.java:354) org.apache.lucene.index.SegmentReader.document(SegmentReader.java:335)
org.elasticsearch.search.fetch.FetchPhase.loadStoredFields(FetchPhase.java:427)
org.elasticsearch.search.fetch.FetchPhase.createSearchHit(FetchPhase.java:219)
org.elasticsearch.search.fetch.FetchPhase.execute(FetchPhase.java:184)
org.elasticsearch.search.SearchService.executeFetchPhase(SearchService.java:401)
org.elasticsearch.search.action.SearchServiceTransportAction$SearchQueryFetchTransportHandler.messageReceived(SearchServiceTransportAction.java:833)
org.elasticsearch.search.action.SearchServiceTransportAction$SearchQueryFetchTransportHandler.messageReceived(SearchServiceTransportAction.java:824)
org.elasticsearch.transport.netty.MessageChannelHandler$RequestHandler.doRun(MessageChannelHandler.java:279) org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:36)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745) 

Is this conclusive enough to point out that it is the fetch_phase that is causing the slowness?

I do believe aggregations could potentially solve the problem, but it might be useful for us to have this data regardless. If it doesn't work, we will probably move to aggregations. Is it possible to retrieve all this data in under 1 second?

No but it is one of the first steps in having a stable cluster. It won't change your fetch times though.

Its just a fairly simple way of saying "I don't ever want scoring".

What version are you using, btw?

Yes. Its the transition from the Lucene level docId to some stored field. Are you using "_source": false, on the query? I suspect you are but I want to make sure.

If count can count it but you aren't getting it now the answer is still "maybe".

The place where aggregations help here is that they don't hit that code at all - they are allowed to load values using doc values rather than stored fields.

To be honest I'm not sure where to go from here. There was a lot of discussion around a somewhat similar thing over here which makes me think you could do something sneaky with a secondary, not_analyzed id field and then using a plugin to perform your search, fetching secondary id field using doc values. It'd be a lot of work I think, but it'd duck this slow thing.

Easier might be to just try some of the built in aggregations across your data - just sum up one of the integer fields or something. Keep the size parameter small, maybe 0. If the request with the aggregation comes back fast enough you have a way forward: do what you want to do with aggregations and/or implement that plugin. If it doesn't come back fast enough then it may not be possible. But you can do the same hot_threads business and look again and maybe figure out angle of attack.

I am trying to retrieve the Lucene level doc_id though. Isn't that what '_id' is?

Yes, I only specify the '_id' field to be returned, which is the same as "_source": false".

Could you explain this in a little more depth? By having a secondary, non_analyzed field, aren't we duplicating what _id gives us in Elasticsearch? It would still have to retrieve the _id field, which is what is slowing us down in the first place, correct?

Thanks for the help. I will try out aggregations for sure. I just wanted to exhaust my options to retrieve a large dataset from Elasticsearch.

_id is a string that elasticsearch relies on and you get to pick. docId is the number that Lucene assigns to a document when indexing. They aren't the same thing, sadly.

Not really. I haven't thought it through in any more depth.

You are but _id doesn't have doc values (think of it as a column database hidden inside of Lucene).

Searches have to fetch it, yeah. That is why it'd be a project. You'd have to make a quite a few things.

The upshot is that Elasticsearch (and Lucene) aren't designed for fetching large numbers of hits. They are designed for fetching the top N scoring of a large number of hits. Aggregations don't fetch the hits in the traditional sense - they look the data up in nice, column wise, on disk data structures. They don't need the _id. The _id is stored with the _source and has to be unzipped in chunks.

If I knew more about what you wanted to do with the data once you'd fetched it I'd be more able to help. Still, I think it is worth trying an aggregation on the number of hits you are expecting just so you can get a sense of how fast that'd be.

Alright. Thanks for the advice, it's definitely been helpful.

Hey Nick,

So I took your advice. I tried the aggregation query and it was super quick. I summed up an integer field (that had doc_values enabled on ES 1.7.1) and it came back in ~20 ms. So I wrote a plugin to do what you suggested. The plugin performs a search and it doesn't return the _id field, just the specified doc_value field. However, I still seem to be getting a latency of ~8 seconds to get the data back for 1 million records.

The hot threads API is showing me that:

100.2% (1s out of 1s) cpu usage by thread 'elasticsearch[cluster][search][T#1]'
     3/10 snapshots sharing following 23 elements
       sun.nio.ch.NativeThread.current(Native Method)
       sun.nio.ch.NativeThreadSet.add(NativeThreadSet.java:46)
       sun.nio.ch.FileChannelImpl.readInternal(FileChannelImpl.java:737)
       sun.nio.ch.FileChannelImpl.read(FileChannelImpl.java:727)
       org.apache.lucene.store.NIOFSDirectory$NIOFSIndexInput.readInternal(NIOFSDirectory.java:179)
       org.apache.lucene.store.BufferedIndexInput.refill(BufferedIndexInput.java:342)
       org.apache.lucene.store.BufferedIndexInput.readByte(BufferedIndexInput.java:54)
       org.apache.lucene.store.DataInput.readVInt(DataInput.java:122)
       org.apache.lucene.store.BufferedIndexInput.readVInt(BufferedIndexInput.java:221)
       org.apache.lucene.codecs.compressing.CompressingStoredFieldsReader.visitDocument(CompressingStoredFieldsReader.java:249)
       org.apache.lucene.index.SegmentReader.document(SegmentReader.java:335)
       org.elasticsearch.search.lookup.SourceLookup.loadSourceIfNeeded(SourceLookup.java:70)
       org.elasticsearch.search.lookup.SourceLookup.extractRawValues(SourceLookup.java:145)
       plugin.retrievedocvalues.search.fetch.CustomFetchPhase.createSearchHit(CustomFetchPhase.java:256)
       plugin.retrievedocvalues.search.fetch.CustomFetchPhase.execute(CustomFetchPhase.java:189)
       plugin.retrievedocvalues.search.CustomSearchService.executeFetchPhase(CustomSearchService.java:500)

It seems like the load from the source is being the bottleneck again. This seems to be the part where it's trying to load the field (secondary integer field with doc_values enabled) and it seems to be slow. Any ideas as to why the doc_value field is taking so long to load? The aggregation is super quick, I don't know why it's so slow to stream the field off the disk.

Well it isn't streaming it - it is building the result in memory and sending it back but...

looks like it is trying to load from source rather than doc values. I'd look at FieldDataFieldsFetchSubPhase to see if that works.

Maybe try 'post_filter'ing https://www.elastic.co/guide/en/elasticsearch/reference/1.7/search-request-post-filter.html to just the '_id' and giving scan-scroll a shard batch 'size' of 10k?