Load data from spark to ElasticSearch Hadoop

Hi all,

I am trying to write RDD of string (json data) to elasticsearch. My cluster has 3 nodes. Have set ES_HEAP to about 15g on all three machines.

Before starting the job I set :
{
"index": {
"number_of_replicas": 0,
"refresh_interval": "-1"
}
}

I have tried with many combinations of varying es.batch.size.bytes and the number of partitions in my RDD, but after some time (depending on the configured values), the ingesting slows down and ultimately the job throws :

org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed; tried [[:9200, :9200, :9200]]
at org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:142)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:434)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:426)
at org.elasticsearch.hadoop.rest.RestClient.bulk(RestClient.java:153)
at org.elasticsearch.hadoop.rest.RestRepository.tryFlush(RestRepository.java:225)
at org.elasticsearch.hadoop.rest.RestRepository.flush(RestRepository.java:248)
at org.elasticsearch.hadoop.rest.RestRepository.doWriteToIndex(RestRepository.java:187)
at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:163)
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:49)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:84)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:84)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)

I have enabled the DEBUG logs of spark job and I can see that initially its very fast but then slowly pauses at

16/08/03 14:00:28 DEBUG EntityEnclosingMethod: Request body sent

for longer and longer duration and after some time throws the above exception.

I have tried with 1mb and 1 RDD partition and still the same result.
Last tried with bytes size 200kb and 3 RDD partitions (or tasks (so 3*200 = 600kb)), and yet get the same issue.

From the experiments it looks like reducing the bytes size just delays the error, result is always the same.

I am using elasticsearch hadoop 2.3.3, spark 1.6.2

Also gone through the GC logs but don't see any unexpected pauses there. I am using dynamic mapping to load the data. Total 86K documents. Using a field in the json data for routing variable and same field as the '_type' value for the index.

Note that I am running the spark job on one of the elastic nodes (not the master node) in local mode.

Need to know why does it slow down so significantly in a short while and does not even complete. What am I doing wrong here ?

Any pointers would be highly appreciated.

Best Regards
Yash

1 Like