Error Address already in use when I use ElasticSearch with Spark

I'm using Spark 1.6 and Elasticsearch 2.2.0.
When there are too many batches I got this exception

2016-08-16 09:20:54,237 [Executor task launch worker-1] ERROR org.apache.spark.executor.Executor - Exception in task 2.0 in stage 28.0 (TID 307)
org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Cannot detect ES version - typically this happens if the network/Elasticsearch cluster is not accessible or when targeting a WAN/Cloud instance without the proper setting 'es.nodes.wan.only'
at org.elasticsearch.hadoop.rest.InitializationUtils.discoverEsVersion(InitializationUtils.java:196)
at org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:379)
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:40)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:84)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:84)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.elasticsearch.hadoop.rest.EsHadoopTransportException: java.net.BindException: Address already in use
at org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:121)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:434)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:414)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:418)
at org.elasticsearch.hadoop.rest.RestClient.get(RestClient.java:122)
at org.elasticsearch.hadoop.rest.RestClient.esVersion(RestClient.java:564)
at org.elasticsearch.hadoop.rest.InitializationUtils.discoverEsVersion(InitializationUtils.java:184)
... 10 more
Caused by: java.net.BindException: Address already in use
at java.net.PlainSocketImpl.socketBind(Native Method)
at java.net.AbstractPlainSocketImpl.bind(AbstractPlainSocketImpl.java:376)
at java.net.Socket.bind(Socket.java:631)
at java.net.Socket.(Socket.java:423)
at java.net.Socket.(Socket.java:280)
at org.apache.commons.httpclient.protocol.DefaultProtocolSocketFactory.createSocket(DefaultProtocolSocketFactory.java:80)
at org.apache.commons.httpclient.protocol.DefaultProtocolSocketFactory.createSocket(DefaultProtocolSocketFactory.java:122)
at org.apache.commons.httpclient.HttpConnection.open(HttpConnection.java:707)
at org.apache.commons.httpclient.HttpMethodDirector.executeWithRetry(HttpMethodDirector.java:387)
at org.apache.commons.httpclient.HttpMethodDirector.executeMethod(HttpMethodDirector.java:171)
at org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:397)
at org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:323)
at org.elasticsearch.hadoop.rest.commonshttp.CommonsHttpTransport.execute(CommonsHttpTransport.java:468)
at org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:104)
... 16 more

I have updated the driver elasticsearch-hadoop-2.3.6.jar. I have read this post as well https://github.com/elastic/elasticsearch-hadoop/issues/570
I'm working with the last version for elasticsearch-hadoop

I seems that I got the error when there are a lot of batches from Spark.

The code is very simple:

def createContext(checkpoint: String, kafkaBrokers: Map[String, String], topic: Set[String], esHost: String,     indexName: String,  batchTime : Int, environment: String): StreamingContext = {
  val sparkConf = new SparkConf()
  sparkConf.set("spark.es.index.auto.create", "true")
  sparkConf.set("es.batch.size.entries", "500")     //I tried to tune this params to fix this situation, but it doesn't work.
  sparkConf.set("es.batch.write.retry.count", "10")
  sparkConf.set("es.batch.write.retry.wait", "60")

  val ssc = new StreamingContext(sparkConf, Seconds(batchTime))
  ssc.checkpoint(checkpoint)

  val directKafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaBrokers, topic)

  directKafkaStream.foreachRDD { rdd =>
    val message = parseLog(rdd, environment)  //Just some random parsing.
    message.saveJsonToEs(indexName + "_{time:YYYY.MM.dd}/logs")
  }
  ssc

}

I have four nodes in my ES cluster and 4 executors with 2 cores each one.

I just met the same issue, and my implementation is mostly same with Guillermo:

    messages.foreachRDD(rdd => save2Es(rdd, Map(
      ConfigurationOptions.ES_RESOURCE -> {
        logParam.esIndex + "-" + LocalDate.now + "/" + logParam.esType
      }), false))
...
  }

  def save2Es(rdd: RDD[_], cfg: Map[String, String], hasMeta: Boolean) = EsSpark.doSaveToEs(rdd, cfg, hasMeta)

And, I'm using Spark 1.5.1 and Elasticsearch5.0.2, and the elastic-spark is:

<dependency>
  <groupId>org.elasticsearch</groupId>
  <artifactId>elasticsearch-spark-13_2.10</artifactId>
  <version>5.0.2</version>
</dependency>

I would avoid saving SparkStreaming data using doSaveToEs with the foreachRDD method. This causes the connector to run normally and the rate at which the connector opens connections to Elasticsearch when running in a streaming context causes the spark workers to run out of connection resources to use; The connector is constantly setting up and tearing down connections every few seconds for each RDD that comes in.

To fix this, I recommend using the SparkStreaming native support in the connector. The SparkStreaming native support has facilities built into it to enable safe connection pooling for multiple jobs in parallel. This should mitigate connection resource exhaustion during streaming job executions.

I think the reason using doSaveToEs in foreachRDD is to change the resource which may be named daily, in streaming API, the resource just be execute once at the beginning on spark driver, and spark executor cannot change it every time when DStream is coming.

@feuyeux A possible fix for this is:

  1. You could generate a field on your documents (lets call it "indexDate")
  2. Populate it with the date of the index you want to ingest each document into (using a map call and a function that strips the time element off a date, let's say)
  3. Configure the load function with a dynamic index name for the resource property so that the new field determines the index name on each document.
  4. Setting es.mapping.exclude property to reference the new field so that it does not show up in your final document that is sent to Elasticsearch.
// Assuming that you are using the regular Stream[Map]:
stream
    .map(doc => doc + ("indexDate" -> determineIndexDate(doc("datetime"))))
    .saveToEs("index-{indexDate}/type", Map("es.mapping.exclude" -> "indexDate"))
1 Like

Hi James,
Thanks a lot, and your solution works for me. I like to share my code:

    val conf = logEnv.getSparkConf(YOUR_APP_NAME)
    conf.set(ConfigurationOptions.ES_NODES, commonParam.esNodes)
    conf.set(ConfigurationOptions.ES_PORT, commonParam.esPort)
    conf.set(ConfigurationOptions.ES_INDEX_AUTO_CREATE, true.toString)
    /*The es.mapping.exclude feature is ignored when es.input.json is specified*/
    conf.set(ConfigurationOptions.ES_INPUT_JSON, false.toString)  
    conf.set(ConfigurationOptions.ES_MAPPING_EXCLUDE, "logIdx")
    conf.set(ConfigurationOptions.ES_MAPPING_ID, "logId")

    val ssc = new StreamingContext(conf, Durations.seconds(commonParam.duration))

    for (logParam <- logParams) {
      val lines = YOUR_BIZ_Utils.createStream(...).cache()
      val messages = lines.map(line => {
        try {
          val message: String = new String(line, Charsets.UTF_8)
          val traceLog: YOUR_POJO = JSON.parseObject(message, classOf[YOUR_POJO])
          val logId: String = traceLog.YOUR_BIZ_KEY + "_" + traceLog.ANOTHER_BIZ_KEY
          val log: Map[String, String] = Map(
            "logId" -> logId,
            "logIdx" -> LocalDate.now.toString,
            "tag" -> traceLog.tag,
            ...)
          log
        } catch {
          case e: Exception => {
            e.printStackTrace()
            null
          }
        }
      }).filter(_ != null).cache()

      EsSparkStreaming.saveToEs(messages, logParam.esIndex + "-{logIdx}/" + logParam.esType,
        /*per task instance, multiplied at runtime by the total number of Hadoop tasks running*/
        Map(ConfigurationOptions.ES_BATCH_SIZE_BYTES -> "1mb", ConfigurationOptions.ES_BATCH_SIZE_ENTRIES -> logParam.batch))
    }

If you wanna provide ES_MAPPING_EXCLUDE to exclude some fields, you should set ES_INPUT_JSON to false, as the comments said that:

The es.mapping.exclude feature is ignored when es.input.json is specified

Accordingly, you should save data using key-value style, instead of json serialized from POJO object.

Most important thing in indexing process is to set ES_BATCH_SIZE_ENTRIES parameter, which is 1000 as default. In my opinion, it's an art for tradeoff duration and task batch, the cost time for indexing the stream should just smaller than the duration time. So, it's also associated with the number of the executor instances.