Elastic Search Hadoop Connector - Spark Facing Issues while Saving to ES

(Mahadevan Krishnan) #1

I am using 5.0.0 alpha 2 version of Elastic Search along with the corresponding ES Hadoop connector. Version of Spark that I am using is 1.6.1. I am running on EMR.

Here are the configuration of Nodes on ES:

1 Master Node - M4.Large
10 Data Nodes - C4. 4X Large
1 Client Node - M4.Large

I was trying to load 7 days worth of log data by Partitioning them into 600 Partitions. Spark Executors: 6 and Spark Executor Cores : 4 and roughly we are loading 127315 records from a single partition.

I get this error and it does not give out additional messages

at org.elasticsearch.hadoop.rest.RestClient.extractError(RestClient.java:229)
at org.elasticsearch.hadoop.rest.RestClient.retryFailedEntries(RestClient.java:195)
at org.elasticsearch.hadoop.rest.RestClient.bulk(RestClient.java:166)
at org.elasticsearch.hadoop.rest.RestRepository.tryFlush(RestRepository.java:224)
at org.elasticsearch.hadoop.rest.RestRepository.flush(RestRepository.java:247)
at org.elasticsearch.hadoop.rest.RestRepository.close(RestRepository.java:266)
at org.elasticsearch.hadoop.rest.RestService$PartitionWriter.close(RestService.java:130)
at org.elasticsearch.spark.rdd.EsRDDWriter$$anonfun$write$1.apply$mcV$sp(EsRDDWriter.scala:42)
at org.apache.spark.TaskContextImpl$$anon$2.onTaskCompletion(TaskContextImpl.scala:68)
at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:79)
at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:77)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:77)
at org.apache.spark.scheduler.Task.run(Task.scala:91)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Also sometimes, I have seen "Could not write all entries (maybe ES was overloaded?)."

We are trying to do POC which will essentially load 8 Billion Log entries(from about 180 days worth of Log Data). We are not being successful in loading even 7 days worth of Data.

Can someone help me point in right direction by pointing out what are we doing wrong? What are the best practices in terms of Sizing? I have gone through the ES Hadoop connector write performance section of documentation and have left all the settings to default.

(Costin Leau) #2

What version of ES are you using?
The error looks like a bug (raised this issue and checking the code indicates this to be as such.
Do you have any logs available (see the docs on how to enable them).

(Mahadevan Krishnan) #3

Hi Costin,

Thanks for replying to my question. I was using Elastic Search v5.0 Alpha 2 release. Actually I do not get these errors while I am running smaller load. I get these errors when I try to push lot of volume and as a result I was thinking if I should add Trace or Debug , which will make it spend considerable amount of time logging . Do you suggest enabling them and run them again?

(Costin Leau) #4

Likely the message is caused by overload. I've fixed the bug and look into double checking the message structure again in ES as it might have changed between Alpha 1, 2 and 3.

I've pushed a fresh nightly build with the fix.

(system) #5