I see two similar settings one in ES-Hadoop and the other in index configuration.
- Is it recommended to set
index.refresh_interval
to -1, whenes.batch.write.refresh=true
? - When running Spark job, how often does refresh API get called? Is it per Spark job, stage or task? Or something smaller?
- I see the following code. Is my understanding correct that Structured Streaming uses EsRDDWriter and the scope of writer (which calls refresh API when closing) is a Spark task?
- Is the refresh API call per index (or per shard)?
- When multiple tasks finish around the same time, is it possible that the code above is calling refresh API multiple times at the same time? Could it cause performance issue on ES side?
- Is there a known performance issue with ES-Hadoop's refresh call when there's a large parallelism?
- I'm running into the following problem. I'm seeing "Read" timeouts with Flush API calls when ingesting to ES using SPARK with the default setting
es.batch.write.refresh=true
And it doesn't seem to relate to tuning Bulk request size because when I setes.batch.write.refresh=false
andindex.refresh_interval='30s'
I don't get the timeout. What's the best practice for configuring refresh when writing a Spark ingest job?
19/09/03 20:37:50 TRACE NetworkClient: Caught exception while performing request [<redacted>:9200][/_refresh] - falling back to the next node in line...
java.net.SocketTimeoutException: Read timed out
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:170)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
at sun.security.ssl.InputRecord.read(InputRecord.java:503)
at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:973)
at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:930)
at sun.security.ssl.AppInputStream.read(AppInputStream.java:105)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
at org.apache.commons.httpclient.HttpParser.readRawLine(HttpParser.java:78)
at org.apache.commons.httpclient.HttpParser.readLine(HttpParser.java:106)
at org.apache.commons.httpclient.HttpConnection.readLine(HttpConnection.java:1116)
at org.apache.commons.httpclient.HttpMethodBase.readStatusLine(HttpMethodBase.java:1973)
at org.apache.commons.httpclient.HttpMethodBase.readResponse(HttpMethodBase.java:1735)
at org.apache.commons.httpclient.HttpMethodBase.execute(HttpMethodBase.java:1098)
at org.apache.commons.httpclient.HttpMethodDirector.executeWithRetry(HttpMethodDirector.java:398)
at org.apache.commons.httpclient.HttpMethodDirector.executeMethod(HttpMethodDirector.java:171)
at org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:397)
at org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:323)
at org.elasticsearch.hadoop.rest.commonshttp.CommonsHttpTransport.execute(CommonsHttpTransport.java:489)
at org.elasticsearch.hadoop.rest.pooling.TransportPool$LeasedTransport.execute(TransportPool.java:235)
at org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:115)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:398)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:362)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:366)
at org.elasticsearch.hadoop.rest.RestClient.refresh(RestClient.java:267)
at org.elasticsearch.hadoop.rest.bulk.BulkProcessor.close(BulkProcessor.java:550)
at org.elasticsearch.hadoop.rest.RestRepository.close(RestRepository.java:219)
at org.elasticsearch.hadoop.rest.RestService$PartitionWriter.close(RestService.java:121)
at org.elasticsearch.spark.rdd.EsRDDWriter$$anonfun$write$1.apply(EsRDDWriter.scala:60)
at org.elasticsearch.spark.rdd.EsRDDWriter$$anonfun$write$1.apply(EsRDDWriter.scala:60)
at org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:128)
at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:118)
at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:118)
at org.apache.spark.TaskContextImpl$$anonfun$invokeListeners$1.apply(TaskContextImpl.scala:131)
at org.apache.spark.TaskContextImpl$$anonfun$invokeListeners$1.apply(TaskContextImpl.scala:129)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:129)
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:117)
at org.apache.spark.scheduler.Task.run(Task.scala:125)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
19/09/03 20:37:50 ERROR NetworkClient: Node [<redacted>:9200] failed (Read timed out); selected next node [<redacted>:9200]