Spark job fails with Elasticsearch 2.3.5 install

Hello,

I had a ES Cluster Setup with ES version 1.7 and below command works without any issues

'spark-submit --num-executors 10 --name mc-es --jars lib/elasticsearch-hadoop-2.1.0.jar,lib/elasticsearch-spark_2.11-2.1.0.jar lib/mc-es-loader_2.10-1.0.jar'

Now I setup a new cluster with ES 2.3.5 and our spark job with above command fails with following error

16/08/22 11:42:00 INFO scheduler.DAGScheduler: Job 2 failed: runJob at EsSparkSQL.scala:56, took 2.722739 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 14): java.lang.StringIndexOutOfBoundsException: String index out of range: -15
at java.lang.String.substring(String.java:1911)
at org.elasticsearch.hadoop.rest.RestClient.discoverNodes(RestClient.java:110)
at org.elasticsearch.hadoop.rest.InitializationUtils.discoverNodesIfNeeded(InitializationUtils.java:58)
at org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:372)
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:40)
at org.elasticsearch.spark.sql.EsSparkSQL$$anonfun$saveToEs$1.apply(EsSparkSQL.scala:56)
at org.elasticsearch.spark.sql.EsSparkSQL$$anonfun$saveToEs$1.apply(EsSparkSQL.scala:56)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

changed spark submit command as below

"spark-submit --num-executors 10 --name "mc-es" --jars lib/elasticsearch-hadoop-2.3.4.jar,lib/elasticsearch-spark_2.11-2.3.4.jar lib/mc-es-loader_2.10-1.0.jar"

which failed with following error

16/08/22 12:29:19 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 1.0 (TID 10, dayrhewdld005.enterprisenet.org): java.lang.Error: Multiple ES-Hadoop versions detected in the classpath; please use only one
jar:file:/mnt/dsk/7/yarn/nm/usercache/mcsmdvusr/appcache/application_1470295768542_30266/container_1470295768542_30266_01_000009/elasticsearch-hadoop-2.3.4.jar
jar:file:/mnt/dsk/7/yarn/nm/usercache/mcsmdvusr/appcache/application_1470295768542_30266/container_1470295768542_30266_01_000009/elasticsearch-spark_2.11-2.3.4.jar

If I use elasticsearch-spark_2.11-2.3.4.jar alone in spark-submit command, I see below error.

16/08/22 12:47:29 INFO scheduler.DAGScheduler: Job 0 finished: parquetFile at TableLoader.scala:55, took 2.698951 s
Exception in thread "main" java.lang.NoSuchMethodError: scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object;
at org.elasticsearch.spark.sql.EsSparkSQL$.saveToEs(EsSparkSQL.scala:44)
at org.elasticsearch.spark.sql.package$SparkDataFrameFunctions.saveToEs(package.scala:26)
at nielsen.mces.TableLoader$$anonfun$main$2.apply(TableLoader.scala:55)
at nielsen.mces.TableLoader$$anonfun$main$2.apply(TableLoader.scala:52)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at nielsen.mces.TableLoader$.main(TableLoader.scala:52)
at nielsen.mces.TableLoader.main(TableLoader.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

I am using scala 2.11.7 and spark-sql_2.10(1.3.0) to build mc-es-loader_2.10-1.0.jar.

Any help on this please? I understood mismatch of jar versions causing the issue.

Thanks

this issue fixed after building jars with right version of elasticsearch-hadoop (2.3.4) and changing spark-submit as below

park-submit --num-executors 10 --name "mc-es" --jars lib/elasticsearch-hadoop-2.3.4.jar lib/mc-es-loader_2.10-1.0.jar

however string values encrypted in ES side ,not sure whether I have to set some property to avoid this

"pnm": "VHJhdmlzIFNjb3R0",
"upnm": "VHJhdmlzIFNjb3R0",
"rd": "MjAxNS0wNy0zMA==",

After setting below property, fixed the issue I am having with string content in ES 2.3.4

val sql = new SQLContext(ctx)
sql.setConf("spark.sql.parquet.binaryAsString", "true")