Stability issues when uploading Databricks tables to Elasticsearch indices (circuit_breaking_exception)

Hi!

We recently got switched from a large (8 node, 21 GB JVM heap memory) cluster to a smaller (3 node, 9 GB JVM heap memory) cluster and now we getting errors while trying to reupload our Elasticsearch indices. It occasionally passes, but for most runs, we're getting a circuit breaking exception. We didn't have this issue on the larger cluster.

circuit_breaking_exception: [parent] Data too large, data for [<http_request>] would be [4080680148/3.8gb], which is larger than the limit of [4080218931/3.7gb], real usage: [4080679872/3.8gb], new bytes reserved: [276/276b], usages [inflight_requests=630/630b, request=0/0b, fielddata=3493/3.4kb, eql_sequence=0/0b, model_inference=0/0b]

As we know, the circuit breaker feature is there to prevent an out of memory exception, so it is good that it is preventing that. I don't understand however how we're supposed to get it to stay within the memory limits.

The official documentation mentions decreasing the bulk size, but changing these parameters to half of the the default values seems does not seem to have an impact.

This our code using the Python DataFrame API. The documentation mostly mentions the legacy RDD API, but other sources hint that this is also using the bulk API.

data.write.format("org.elasticsearch.spark.sql")
        .options(**get_es_conf())
        .mode("overwrite")
        .save(index)

The Databricks job cluster

Driver: Standard_DS3_v2 · Workers: Standard_DS3_v2 · 2 workers · 12.2 LTS (includes Apache Spark 3.3.2, Scala 2.12)

The workflow task has a dependency the driver library org.elasticsearch:elasticsearch-spark-30_2.12:8.6.2

The Elasticsearch cluster is running Elasticsearch version 8.6.2.

How can we configure it so that successfully uploads the new data and doesn't trigger the circuit breaker?

Hi @landlord_matt. How many spark tasks do you have writing to Elasticsearch at once? And what exactly are your values for es.batch.size.bytes and es.batch.size.entries? And are you certain that your modified values are getting passed in to spark?

Thanks for the reply.

How many spark tasks do you have writing to Elasticsearch at once?

I'm not entirely sure how to determine this. In later version of Spark/Databricks, this has been automated and abstracted away from the users. I'm not sure how to set it to a fixed number or how to see how many it runs by default. When running the notebook in interactive mode it seems like between 8 and 4. I don't now how to determine it for a completed job run.

And what exactly are your values for es.batch.size.bytes and es.batch.size.entries?

I tried with 500 entries and 0.5mb

And are you certain that your modified values are getting passed in to spark?

I'm sure they are going into options, but I don't know what happens after that. Is it somehow possible to see the request on ES-side?

The infra team tried setting us up with a new instance running with Elasticsearch cloud on Azure instead of the official Azure Elasticsearch resource and upgraded that version to 8.9.0. In the initial run with default setting with the new instance we didn't get the memory issue, but instead got a a timeout error after 3h.

In general the official Elasticsearch documentation is centered around RDD, but recent version of Spark recommends you use the highlevel Dataframe instead of the low level RDD api (source). Maybe you should revisit the Spark documentation to make it easier to interact with the increasingly popular Databricks?

Update: The documentation takes PRs, so I guess I could send proposal.

We don't log that anywhere that I'm aware of, but if you see it in your spark configuration in your job history then it's fine.
The reason I asked how many spark tasks you're using is that if each one is sending relatively small batches (for example the one in your error message was just a few bytes), then it is unlikely that a single large batch is your problem. It looks like something else is holding onto memory. I assume that the only thing going on with this cluster is the data load from spark. So I'm guessing that you have a large number of tasks, each sending a small amount of data. But it adds up to a lot in memory at once. If you have access to your spark job history server you'll be able to click around to see how many tasks were running at once (I think you click on the Executors tab).

If you open a support ticket with the Elastic team we could look into what is going on in this cluster. What exactly is timing out? Can you extend that timeout to see if it ever completes? How long do you expect the job to take?

Good news! Increasing the memory so that the heap memory is 16 GB solved the issue and it now completes faster (1 h 25 minutes) than the old 21 GB version (1 h 55 minutes). So we upgraded the memory and the Elasticsearch version to 8.9.0. I'm not sure if the switch to Elastic Cloud is related.

It looks like I can see how many tasks that ran, but not how many tasks that ran at the same time in the Executor tab of the Spark UI :frowning:

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.