I'm using Spark 1.6 and Elasticsearch 2.2.0.
When there are too many batches I got this exception
2016-08-16 09:20:54,237 [Executor task launch worker-1] ERROR org.apache.spark.executor.Executor - Exception in task 2.0 in stage 28.0 (TID 307)
org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Cannot detect ES version - typically this happens if the network/Elasticsearch cluster is not accessible or when targeting a WAN/Cloud instance without the proper setting 'es.nodes.wan.only'
at org.elasticsearch.hadoop.rest.InitializationUtils.discoverEsVersion(InitializationUtils.java:196)
at org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:379)
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:40)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:84)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:84)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.elasticsearch.hadoop.rest.EsHadoopTransportException: java.net.BindException: Address already in use
at org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:121)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:434)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:414)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:418)
at org.elasticsearch.hadoop.rest.RestClient.get(RestClient.java:122)
at org.elasticsearch.hadoop.rest.RestClient.esVersion(RestClient.java:564)
at org.elasticsearch.hadoop.rest.InitializationUtils.discoverEsVersion(InitializationUtils.java:184)
... 10 more
Caused by: java.net.BindException: Address already in use
at java.net.PlainSocketImpl.socketBind(Native Method)
at java.net.AbstractPlainSocketImpl.bind(AbstractPlainSocketImpl.java:376)
at java.net.Socket.bind(Socket.java:631)
at java.net.Socket.(Socket.java:423)
at java.net.Socket.(Socket.java:280)
at org.apache.commons.httpclient.protocol.DefaultProtocolSocketFactory.createSocket(DefaultProtocolSocketFactory.java:80)
at org.apache.commons.httpclient.protocol.DefaultProtocolSocketFactory.createSocket(DefaultProtocolSocketFactory.java:122)
at org.apache.commons.httpclient.HttpConnection.open(HttpConnection.java:707)
at org.apache.commons.httpclient.HttpMethodDirector.executeWithRetry(HttpMethodDirector.java:387)
at org.apache.commons.httpclient.HttpMethodDirector.executeMethod(HttpMethodDirector.java:171)
at org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:397)
at org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:323)
at org.elasticsearch.hadoop.rest.commonshttp.CommonsHttpTransport.execute(CommonsHttpTransport.java:468)
at org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:104)
... 16 more
I would avoid saving SparkStreaming data using doSaveToEs with the foreachRDD method. This causes the connector to run normally and the rate at which the connector opens connections to Elasticsearch when running in a streaming context causes the spark workers to run out of connection resources to use; The connector is constantly setting up and tearing down connections every few seconds for each RDD that comes in.
To fix this, I recommend using the SparkStreaming native support in the connector. The SparkStreaming native support has facilities built into it to enable safe connection pooling for multiple jobs in parallel. This should mitigate connection resource exhaustion during streaming job executions.
I think the reason using doSaveToEs in foreachRDD is to change the resource which may be named daily, in streaming API, the resource just be execute once at the beginning on spark driver, and spark executor cannot change it every time when DStream is coming.
You could generate a field on your documents (lets call it "indexDate")
Populate it with the date of the index you want to ingest each document into (using a map call and a function that strips the time element off a date, let's say)
Configure the load function with a dynamic index name for the resource property so that the new field determines the index name on each document.
Setting es.mapping.excludeproperty to reference the new field so that it does not show up in your final document that is sent to Elasticsearch.
// Assuming that you are using the regular Stream[Map]:
stream
.map(doc => doc + ("indexDate" -> determineIndexDate(doc("datetime"))))
.saveToEs("index-{indexDate}/type", Map("es.mapping.exclude" -> "indexDate"))
Hi James,
Thanks a lot, and your solution works for me. I like to share my code:
val conf = logEnv.getSparkConf(YOUR_APP_NAME)
conf.set(ConfigurationOptions.ES_NODES, commonParam.esNodes)
conf.set(ConfigurationOptions.ES_PORT, commonParam.esPort)
conf.set(ConfigurationOptions.ES_INDEX_AUTO_CREATE, true.toString)
/*The es.mapping.exclude feature is ignored when es.input.json is specified*/
conf.set(ConfigurationOptions.ES_INPUT_JSON, false.toString)
conf.set(ConfigurationOptions.ES_MAPPING_EXCLUDE, "logIdx")
conf.set(ConfigurationOptions.ES_MAPPING_ID, "logId")
val ssc = new StreamingContext(conf, Durations.seconds(commonParam.duration))
for (logParam <- logParams) {
val lines = YOUR_BIZ_Utils.createStream(...).cache()
val messages = lines.map(line => {
try {
val message: String = new String(line, Charsets.UTF_8)
val traceLog: YOUR_POJO = JSON.parseObject(message, classOf[YOUR_POJO])
val logId: String = traceLog.YOUR_BIZ_KEY + "_" + traceLog.ANOTHER_BIZ_KEY
val log: Map[String, String] = Map(
"logId" -> logId,
"logIdx" -> LocalDate.now.toString,
"tag" -> traceLog.tag,
...)
log
} catch {
case e: Exception => {
e.printStackTrace()
null
}
}
}).filter(_ != null).cache()
EsSparkStreaming.saveToEs(messages, logParam.esIndex + "-{logIdx}/" + logParam.esType,
/*per task instance, multiplied at runtime by the total number of Hadoop tasks running*/
Map(ConfigurationOptions.ES_BATCH_SIZE_BYTES -> "1mb", ConfigurationOptions.ES_BATCH_SIZE_ENTRIES -> logParam.batch))
}
If you wanna provide ES_MAPPING_EXCLUDE to exclude some fields, you should set ES_INPUT_JSON to false, as the comments said that:
The es.mapping.exclude feature is ignored when es.input.json is specified
Accordingly, you should save data using key-value style, instead of json serialized from POJO object.
Most important thing in indexing process is to set ES_BATCH_SIZE_ENTRIES parameter, which is 1000 as default. In my opinion, it's an art for tradeoff duration and task batch, the cost time for indexing the stream should just smaller than the duration time. So, it's also associated with the number of the executor instances.
Apache, Apache Lucene, Apache Hadoop, Hadoop, HDFS and the yellow elephant
logo are trademarks of the
Apache Software Foundation
in the United States and/or other countries.