Tune Elasticsearch to process thousands of simultaneous searches

I have a question about Elastic Cloud deployments. I created a 3-zone 32 CPU compute-optimized deployment on GCP with just Elasticsearch (hot data nodes and coordinating nodes only) and Kibana enabled, and loaded, using Apache Spark, a large relational dataset of 50+ million records that I want to run queries on. The dataset is comprised of strings and an integer field.

I have another relational dataset (7 million records) that I want to compare against the first one. So, I am again using Apache Spark (160 threads) and each one is sending a msearch request to try to reduce roundtrips. Each request is sending a batch of 30 queries like the following:

{
    "query": {
        "bool": {
            "should": [
                {"match": {"col1": {"query": "mystr", "fuzziness": "AUTO"}}},
                {"match": {"col2": {"query": "otherstr", "fuzziness": "AUTO"}}},
                {"match": {"col3": {"query": "onelaststr", "fuzziness": "AUTO"}}},
                {"range": {"numericstr": {"from": n, "to": m}}}
            ],
            "minimum_should_match": 0
        }
    },
    "sort": {"_score": "desc"},
    "size": 10
}

As expected, the deployment's CPU reaches100% immediately after the job starts and I get these response errors:

{'error': {'root_cause': [{'type': 'es_rejected_execution_exception', 'reason': 'rejected execution of TimedRunnable{original=ActionRunnable#wrap[org.elasticsearch.search.SearchService ...  org.elasticsearch.common.util.concurrent.EWMATrackingEsThreadPoolExecutor@492e9e87[Running, pool size = 49, active threads = 49, queued tasks = 1000, completed tasks = 18281]

Are there some guidelines to fine tune in this case? I'm fine with reducing the number of threads and/or decreasing the amount of requests in a msearch batch, I just want to find the right balance between execution time and price. Any help is appreciated

Thanks

Hi @aldoorozco Welcome to the community and thanks for trying elastic cloud

So this is without much analysis...

This may seem odd but if you are using the gcp.es.datahot.n2.68x32x45 which I think you are... I might not use coordinators they may actually be slowing you down... (long conversation but I would try without coordinators)

Then take another look...

Hi @stephenb. Thanks. Glad to be here.

I just tried that and I'm getting pretty much the same result, unfortunately.

I was curious, though. I tried both bumping the instances to 64 CPU and even enabled autoscaling, but nothing seems to truly make any difference. By looking at the charts, it seems like requests are only being routed to 2 nodes actually, and the rest of them are just there idle. I am using the elasticsearch python client with the cloud ID, so I assume that something in Elastic Cloud should load balance to the proper nodes and increase the capacity. Is that the case?

If you had any other suggestions, I would love to hear them :pray:

Thanks!

Ok but probably good to reduce the variables...

How many primary and replica shards?

You could try to increase the primary shards which should give you more parallelism.

For example, say you have 3 nodes and this is about load try setting primary to 3 shards. This more about load parralleism

When you bump to 64 it is actually 2 x 32 Machines

If you have 6 machines try setting the primary shards to the number of nodes.

If this is on the search side only you can also try increasing the replicas to 2 or 3 to increase search throughput (tradeoffs on loading)

But in the end, you will probably need to tune more of the settings

Did you look at

Thanks for the doc! Yeah, I've looked at it a few times and is pretty extensive.

Even though I have 6 nodes (3 zones x 64 CPUs, or (3 x 2 x 32 VMs) ) it seems like it just has 1 primary and 1 secondary shard (both seem to be 20GB in size).

I looked up everywhere, but I just cannot find where that is configured (I assume that would be set when I write using the spark-connector, but I can't find the exact config here
). That seems to be what's slowing me down after all.

Do you know how/where to specify that primary shard and replica shard count? fwiw, my pyspark code looks something like

        df.write
        .format("org.elasticsearch.spark.sql")
        .option("es.resource", f"myindex")
        .save()

Create an index template and set in that.

You can only set the number of primary shards when the index is created, but you can change the replicas at any time.

Or just manually creat the index with a mapping and set the index.number_of_shards

Sorry I know nothing about spark code.

Thanks for the pointers, Stephen.

I created the index template using the python ES client [1], and writing sped up significantly (from 20 mins to 5). On the other hand, queries (which take over 95+% of the pipeline execution) didn't really improved all that much: I'm still getting the same errors using 140 threads, 20 searches each.

Are there any other tweaks available? Since this is related to thread pools, I wonder if there is a way to increase them on Elastic Cloud and if that's a good idea.

[1] Leaving the python code here in case anyone reading this thread finds it useful:

    from elasticsearch import Elasticsearch
    from elasticsearch.client import IndicesClient

    es = Elasticsearch(
        cloud_id=ES_CLOUD_ID,
        basic_auth=(ES_USER, ES_PASSW),
    )

    client = IndicesClient(es)

    if not client.exists_index_template(name="template").body:
        template = {
            "settings": {
                "number_of_shards": 6,
                "number_of_replicas": 3
            }
        }
        client.put_index_template(name="template", create=True, index_patterns=[f"{index}*"], template=template)

    if mode == "overwrite" and client.exists(index=index).body:
        client.delete(index=index)

    # Wait for index to be fully deleted
    while client.exists(index=index).body:
        sleep(0.5)

    (
        df.write.mode(mode)
        .format("org.elasticsearch.spark.sql")
        .option("es.resource", f"{index}/")
        .save()
    )

How many indices are you querying? How many primary and replica shards are these configured with? How large are your indices?

Hi Christian,

My ES deployment is 96 CPUs (9 x 32-CPU instances, across 3 zones). I have just 1 index with 9 primary and 5 replicas shards. Each shard is 1.3gb in size. I tried to follow Stephen's idea of 1 primary shard per node. I added multiple replica shards to see if that improved search performance.

How much RAM does your nodes have?

64 GB

In order to optimize for high query concurrency it is important to make sure that all data can fit into the operating system page cache so that disk I/O can be minimised. That seem to be the case as each node would hold approximately 12GB of indexed data and the heap will be less than 32GB in size, but if you are continously updating the index this will affect performance and potentially impact the cache.

The second step is to use your threadpools as efficiently as possible. The best way to do this in your case would be to have a single primary shard and 8 replica shards. This assumes that the query latency of a single shard holding the full data set is sufficient. If that is not the case, split the index into as few primary shards as possible until latency is acceptable. The more primary shards you have, the fewer queries can be efficiently served concurrently.

I would recommend setting it up this way and then test a heavy query load (without updates) against it to see how it performs. You can then do the same test while updating to quantify the impact. If queries do not need access to prior updates and concurrent indexing and querying reduces performance significantly, it may be worth considering queueing up all updates, e.g. on file or in Kafka, and apply them only once all querying and processing has completed.

3 Likes

Thanks Christian. Yeah, I am not updating the index at all, just loading the index once and running the searches. My main bottleneck is the msearch on those millions of records in a timely fashion.

Reducing to 1 primary and 8 replicas made queries run significantly faster. Now the pipeline processes 500K records in about 15 mins using 80 threads. Increasing primary shards to 2 wound up slowing things down, so I'm keeping the 1 primary [n - 1] replicas.

I have a follow up question. I increase the deployment size from 9 to 15 nodes to check if that yields snappier / more searches. I used the same ratio -- 1 primary and 14 replica shards -- but now I am getting errors with shards not being assigned after loading. I didn't have this problem while working with 9 nodes. Is there a setting I need to change to fix this?

GET /_cluster/allocation/explain
...
  "node_allocation_decisions": [
    {
      "node_decision": "throttled",
      "transport_address": "10.42.6.32:19369",
      "weight_ranking": 1,
      "node_name": "instance-0000000010",
      "node_id": "m8LVnIedRRK01p5mkXeoZg",
      "deciders": [
        {
          "decision": "THROTTLE",
          "explanation": "reached the limit of outgoing shard recoveries [2] on the node [UvB-uJ_ZQJGauHfUHtQ3fA] which holds the primary, cluster setting [cluster.routing.allocation.node_concurrent_outgoing_recoveries=2] (can also be set via [cluster.routing.allocation.node_concurrent_recoveries])",
          "decider": "throttling"
        }
      ],
      "node_attributes": {
        "server_name": "instance-0000000010.26d68f846c7b4ed980dfe92f8bec3da4",
        "availability_zone": "us-central1-c",
        "region": "unknown-region",
        "xpack.installed": "true",
        "instance_configuration": "gcp.es.datahot.n2.68x32x45",
        "logical_availability_zone": "zone-2",
        "data": "hot"
      }

What version are you running? If it's 8.2 or later, the top-level summary of this output should say something like this:

Elasticsearch is currently busy with other activities. It expects to be able to allocate this shard when those activities finish. Please wait.

This is correct, you just need to wait for the shards to finish allocating.

Hi David,

I'm on 8.6.1. I deleted the deployment ('cause it was expensive to keep it alive while I was out). Will create a new one and give it another try.

Something I don't understand is why a larger cluster (15 nodes) presents these problems, whereas a smaller one (9 nodes) did not. I would assume a larger cluster has higher throughput. Not sure what those other activities mean in this case, as I am not running any searches or any other command while load is in progress

"Other activities" in this context is just allocating other shard copies. When you create the index, it's best to wait for it to reach green health before you start to index into it.

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.