Difference between `index.refresh_interval` vs `es.batch.write.refresh`

I see two similar settings one in ES-Hadoop and the other in index configuration.

  1. Is it recommended to set index.refresh_interval to -1, when es.batch.write.refresh=true?
  2. When running Spark job, how often does refresh API get called? Is it per Spark job, stage or task? Or something smaller?
    1. I see the following code. Is my understanding correct that Structured Streaming uses EsRDDWriter and the scope of writer (which calls refresh API when closing) is a Spark task?
    2. Is the refresh API call per index (or per shard)?
    3. When multiple tasks finish around the same time, is it possible that the code above is calling refresh API multiple times at the same time? Could it cause performance issue on ES side?
    4. Is there a known performance issue with ES-Hadoop's refresh call when there's a large parallelism?
  3. I'm running into the following problem. I'm seeing "Read" timeouts with Flush API calls when ingesting to ES using SPARK with the default setting es.batch.write.refresh=true And it doesn't seem to relate to tuning Bulk request size because when I set es.batch.write.refresh=false and index.refresh_interval='30s' I don't get the timeout. What's the best practice for configuring refresh when writing a Spark ingest job?
19/09/03 20:37:50 TRACE NetworkClient: Caught exception while performing request [<redacted>:9200][/_refresh] - falling back to the next node in line...
java.net.SocketTimeoutException: Read timed out
	at java.net.SocketInputStream.socketRead0(Native Method)
	at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
	at java.net.SocketInputStream.read(SocketInputStream.java:170)
	at java.net.SocketInputStream.read(SocketInputStream.java:141)
	at sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
	at sun.security.ssl.InputRecord.read(InputRecord.java:503)
	at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:973)
	at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:930)
	at sun.security.ssl.AppInputStream.read(AppInputStream.java:105)
	at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
	at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
	at org.apache.commons.httpclient.HttpParser.readRawLine(HttpParser.java:78)
	at org.apache.commons.httpclient.HttpParser.readLine(HttpParser.java:106)
	at org.apache.commons.httpclient.HttpConnection.readLine(HttpConnection.java:1116)
	at org.apache.commons.httpclient.HttpMethodBase.readStatusLine(HttpMethodBase.java:1973)
	at org.apache.commons.httpclient.HttpMethodBase.readResponse(HttpMethodBase.java:1735)
	at org.apache.commons.httpclient.HttpMethodBase.execute(HttpMethodBase.java:1098)
	at org.apache.commons.httpclient.HttpMethodDirector.executeWithRetry(HttpMethodDirector.java:398)
	at org.apache.commons.httpclient.HttpMethodDirector.executeMethod(HttpMethodDirector.java:171)
	at org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:397)
	at org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:323)
	at org.elasticsearch.hadoop.rest.commonshttp.CommonsHttpTransport.execute(CommonsHttpTransport.java:489)
	at org.elasticsearch.hadoop.rest.pooling.TransportPool$LeasedTransport.execute(TransportPool.java:235)
	at org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:115)
	at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:398)
	at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:362)
	at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:366)
	at org.elasticsearch.hadoop.rest.RestClient.refresh(RestClient.java:267)
	at org.elasticsearch.hadoop.rest.bulk.BulkProcessor.close(BulkProcessor.java:550)
	at org.elasticsearch.hadoop.rest.RestRepository.close(RestRepository.java:219)
	at org.elasticsearch.hadoop.rest.RestService$PartitionWriter.close(RestService.java:121)
	at org.elasticsearch.spark.rdd.EsRDDWriter$$anonfun$write$1.apply(EsRDDWriter.scala:60)
	at org.elasticsearch.spark.rdd.EsRDDWriter$$anonfun$write$1.apply(EsRDDWriter.scala:60)
	at org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:128)
	at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:118)
	at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:118)
	at org.apache.spark.TaskContextImpl$$anonfun$invokeListeners$1.apply(TaskContextImpl.scala:131)
	at org.apache.spark.TaskContextImpl$$anonfun$invokeListeners$1.apply(TaskContextImpl.scala:129)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:129)
	at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:117)
	at org.apache.spark.scheduler.Task.run(Task.scala:125)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
19/09/03 20:37:50 ERROR NetworkClient: Node [<redacted>:9200] failed (Read timed out); selected next node [<redacted>:9200]

Hi @danielyahn, thanks for posting your questions on here. I'll try to answer them in order

Is it recommended to set index.refresh_interval to -1, when es.batch.write.refresh=true ?

It depends (tm).

The index.refresh_interval is the setting on Elasticsearch that informs it of how often to refresh the index for searching purposes. Usually this involves closing out the underlying lucene writer for a segment and making it available to the search system. By default, this is at 1 second, and there are some optimizations to make it so that refreshes for indices that aren't searched often are skipped until traffic increases. If you have SLA's around how quickly data that has been indexed is able to be visible for search operations, usually you would set this value to something that would be close to or below that time value. Usually for high rates of ingestion, we suggest that people set this value to be much higher. How much higher? It depends (tm) again. Normally this is tune-and-test territory to find the right amount of time.

The es.batch.write.refresh setting is indeed part of ES-Hadoop, and informs the connector of whether or not it should perform a refresh operation after tasks have completed. Often times, if you are tuning the refresh interval for an index, it is advised to disable this setting (=false) since ES-Hadoop will submit refresh operations to the cluster without regard for the index's refresh interval.

When running Spark job, how often does refresh API get called? Is it per Spark job, stage or task? Or something smaller?

If enabled, ES-Hadoop will perform a refresh operation on all indices it is writing to at the conclusion of the task that is performing the write operation. For Spark, this is usually on the task scale.

Is the refresh API call per index (or per shard)?

The refresh call is per index, and refreshes all shards under it. This makes the refresh operation a bit costly, and thus we suggest turning it off if you are tuning refresh interval at all.

When multiple tasks finish around the same time, is it possible that the code above is calling refresh API multiple times at the same time? Could it cause performance issue on ES side?

This is correct - Since multiple tasks may finish around the same time, it is often the case that multiple refresh operations are fired off in rapid succession. For most batch workflows, this is not a problem since the job has concluded and a refresh is usually appropriate, but for workflows that expect to continue writing after a task completes, this is non optimal. Refreshing too often can cause Elasticsearch to close out write operations with fewer documents, thus incurring a higher IO cost as it writes smaller files to disk.

The default for this setting in ES-Hadoop is more informed by the desire to be easy to start working with, and tunable later. Thus we set it to true by default.

Is there a known performance issue with ES-Hadoop's refresh call when there's a large parallelism?

As mentioned above, rapid calls to refresh while indexing is on going incurs higher IO costs as Elasticsearch writes files to make documents available for search. In high paralellism environments, it is suggested to disable automatic refresh in ES-Hadoop, and configure the refresh interval on the index to a reasonable amount of time.

I'm running into the following problem. I'm seeing "Read" timeouts with Flush API calls when ingesting to ES using SPARK with the default setting es.batch.write.refresh=true And it doesn't seem to relate to tuning Bulk request size because when I set es.batch.write.refresh=false and index.refresh_interval='30s' I don't get the timeout. What's the best practice for configuring refresh when writing a Spark ingest job?

If it is not failing, usually the defaults are fine. If you are noticing a decrease in speed due to higher IO load on Elasticsearch, we suggest tuning the refresh rate and automatic refresh at that point. Clearly, since it is failing due to timeouts, you should for sure disable the ES-Hadoop automatic refresh and rely on the Elasticsearch index refresh rate to perform the refresh operation. This makes even more sense when you are ingesting from Streaming since Spark's streaming architecture is microbatch based. Elasticsearch will be able to better batch up documents for refresh on its own across multiple microbatches compared to receiving constant refresh operations.

1 Like

Thanks for your detailed answers. Loving it!

I have a few follow-up questions / suggestions.

This is correct - Since multiple tasks may finish around the same time, it is often the case that multiple refresh operations are fired off in rapid succession.

I think potential improvement can be refining the scope of the refresh call - perhaps to a Spark stage, so that refresh call only executed once when all parallel tasks finish.

The default for this setting in ES-Hadoop is more informed by the desire to be easy to start working with, and tunable later. Thus we set it to true by default.

The default setting of index.refresh_interval in Elasticsearch is 1s, as shown here. Therefore, I would argue the opposite. I believe that es.batch.write.refresh should be false by default. Refresh rate is controlled by ElasticSearch by default and there's no need for es-hadoop library users to enable refresh unless you intentionally disable ElasticSearch's default setting.

Lastly, one minor point, can this discussion be refactored and added to the official doc so that our findings are shared with the larger community?

@james.baiera any thought?

This is an interesting idea. Worth investigating imo.

I'm not too attached one way or the other to the default behavior here. I know that historically we've wanted our default values to reflect a positive out of the box experience, and having the data available for search guaranteed after a job completes was most likely a feature that made sense to have enabled by default at that time. That said, I feel like its a common enough suggestion to disable it that it might as well be off by default. If you open an issue for it in Github, I'll mark it as discuss and we'll come to a conclusion on the best route forward. Just note that since its a change to the default value, it's considered a breaking change and would need to wait for 8.0.