Unable to write in ElasticSearch using Spark in java (throws java.lang.IncompatibleClassChangeError: Implementing class exception)

I am using a simple Java program to index a spark JavaRDD into
Elasticsearch. My code looks like this -

SparkConf conf = new 

SparkConf().setAppName("IndexDemo").setMaster("spark://ct-0094:7077");

    conf.set("spark.serializer", 

org.apache.spark.serializer.KryoSerializer.class.getName());
conf.set("es.index.auto.create", "true");
conf.set("es.nodes", "192.168.50.103");
conf.set("es.port", "9200");
JavaSparkContext sc = new JavaSparkContext(conf);

sc.addJar("./target/SparkPOC-0.0.1-SNAPSHOT-jar-with-dependencies.jar");

    String arrayval = "string";
    List<Data> data = Arrays.asList(
            new Data(1l, 10l, arrayval+"1"),
            new Data(2l, 20l, arrayval+"2"),
            new Data(3l, 30l, arrayval+"3"),
            new Data(4l, 40l, arrayval+"4"),
            new Data(5l, 50l, arrayval+"5"),
            new Data(6l, 60l, arrayval+"6"),
            new Data(7l, 70l, arrayval+"7"),
            new Data(8l, 80l, arrayval+"8"),
            new Data(9l, 90l, arrayval+"9"),
            new Data(10l, 100l, arrayval+"10")
    );
    
    JavaRDD<Data> javaRDD = sc.parallelize(data);
    saveToEs(javaRDD, "index/type");

Running above codes gives an exception (Stack Trace)-

15/01/16 13:20:41 INFO spark.SecurityManager: Changing view acls to: root
15/01/16 13:20:41 INFO spark.SecurityManager: Changing modify acls to: root
15/01/16 13:20:41 INFO spark.SecurityManager: SecurityManager:
authentication disabled; ui acls disabled; users with view permissions:
Set(root); users with modify permissions: Set(root)
15/01/16 13:20:41 INFO slf4j.Slf4jLogger: Slf4jLogger started
15/01/16 13:20:41 INFO Remoting: Starting remoting
15/01/16 13:20:41 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://sparkDriver@ct-0015:55586]
15/01/16 13:20:41 INFO util.Utils: Successfully started service
'sparkDriver' on port 55586.
15/01/16 13:20:41 INFO spark.SparkEnv: Registering MapOutputTracker
15/01/16 13:20:41 INFO spark.SparkEnv: Registering BlockManagerMaster
15/01/16 13:20:41 INFO storage.DiskBlockManager: Created local directory at
/tmp/spark-local-20150116132041-f924
15/01/16 13:20:41 INFO storage.MemoryStore: MemoryStore started with
capacity 2.3 GB
15/01/16 13:20:41 WARN util.NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
15/01/16 13:20:41 INFO spark.HttpFileServer: HTTP File server directory is
/tmp/spark-a65b108f-e131-480a-85b2-ed65650cf991
15/01/16 13:20:42 INFO spark.HttpServer: Starting HTTP Server
15/01/16 13:20:42 INFO server.Server: jetty-8.1.14.v20131031
15/01/16 13:20:42 INFO server.AbstractConnector: Started
SocketConnector@0.0.0.0:34049
15/01/16 13:20:42 INFO util.Utils: Successfully started service 'HTTP file
server' on port 34049.
15/01/16 13:20:42 INFO server.Server: jetty-8.1.14.v20131031
15/01/16 13:20:42 INFO server.AbstractConnector: Started
SelectChannelConnector@0.0.0.0:4040
15/01/16 13:20:42 INFO util.Utils: Successfully started service 'SparkUI'
on port 4040.
15/01/16 13:20:42 INFO ui.SparkUI: Started SparkUI at http://ct-0015:4040
15/01/16 13:20:42 INFO client.AppClient$ClientActor: Connecting to master
spark://ct-0094:7077...
15/01/16 13:20:42 INFO cluster.SparkDeploySchedulerBackend: Connected to
Spark cluster with app ID app-20150116131933-0078
15/01/16 13:20:42 INFO netty.NettyBlockTransferService: Server created on
34762
15/01/16 13:20:42 INFO storage.BlockManagerMaster: Trying to register
BlockManager
15/01/16 13:20:42 INFO storage.BlockManagerMasterActor: Registering block
manager ct-0015:34762 with 2.3 GB RAM, BlockManagerId(, ct-0015,

15/01/16 13:20:42 INFO storage.BlockManagerMaster: Registered BlockManager
15/01/16 13:20:42 INFO cluster.SparkDeploySchedulerBackend:
SchedulerBackend is ready for scheduling beginning after reached
minRegisteredResourcesRatio: 0.0
15/01/16 13:20:43 INFO spark.SparkContext: Added JAR
./target/SparkPOC-0.0.1-SNAPSHOT-jar-with-dependencies.jar at
http://192.168.50.103:34049/jars/SparkPOC-0.0.1-SNAPSHOT-jar-with-dependencies.jar
with timestamp 1421394643161
Exception in thread "main" java.lang.IncompatibleClassChangeError:
Implementing class
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:760)
at
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:455)
at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:367)
at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.elasticsearch.spark.rdd.EsSpark$.saveToEs(EsSpark.scala:30)
at org.elasticsearch.spark.rdd.EsSpark$.saveToEs(EsSpark.scala:24)
at
org.elasticsearch.spark.rdd.api.java.JavaEsSpark$.saveToEs(JavaEsSpark.scala:28)
at
org.elasticsearch.spark.rdd.api.java.JavaEsSpark.saveToEs(JavaEsSpark.scala)
at
com.cleartrail.spark.poc.elasticsearch.demo.ESPerformerClass.main(ESPerformerClass.java:39)

I have following dependencies in pom.xml -

 <dependencies>
  <dependency> <!-- Spark dependency -->
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.10</artifactId>
      <version>1.2.0</version>
  </dependency>
  <dependency>
      <groupId>org.spark-project</groupId>
         <artifactId>spark-streaming_2.9.2</artifactId>
      <version>0.7.0</version>
  </dependency>
  <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.10</artifactId>
      <version>1.2.0</version>
  </dependency>
  <dependency>
      <groupId>org.elasticsearch</groupId>
      <artifactId>elasticsearch-hadoop</artifactId>
      <version>2.0.2</version>
  </dependency>
 </dependencies>

I am using ElastiSearch-0.90.3, Apache Spark-1.2.0

Is there any version mismatch? Or the method saveToEs is deprecated?

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/aead7fef-ac62-48df-95d6-3765e139dd5e%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Hi,

Most likely you have some classes compiled against some old libraries - it could even be your jar.
Spark relies on Java serialization so if your classes or library change, you need to make sure the update version
is used through-out the entire chain.

Oh, and by the way, you seem to be using es-hadoop 2.1.Beta (and its Java API for Spark) but in your classpath rely
on es-hadoop 2.0.

Cheers,

On 1/16/15 10:40 AM, Abhishek Patel wrote:

I am using a simple Java program to index a spark JavaRDD into Elasticsearch. My code looks like this -

 SparkConf conf = new SparkConf().setAppName("IndexDemo").setMaster("spark://ct-0094:7077");

     conf.set("spark.serializer", org.apache.spark.serializer.KryoSerializer.class.getName());
     conf.set("es.index.auto.create", "true");
     conf.set("es.nodes", "192.168.50.103");
     conf.set("es.port", "9200");
     JavaSparkContext sc = new JavaSparkContext(conf);
     sc.addJar("./target/SparkPOC-0.0.1-SNAPSHOT-jar-with-dependencies.jar");

     String arrayval = "string";
     List<Data> data = Arrays.asList(
             new Data(1l, 10l, arrayval+"1"),
             new Data(2l, 20l, arrayval+"2"),
             new Data(3l, 30l, arrayval+"3"),
             new Data(4l, 40l, arrayval+"4"),
             new Data(5l, 50l, arrayval+"5"),
             new Data(6l, 60l, arrayval+"6"),
             new Data(7l, 70l, arrayval+"7"),
             new Data(8l, 80l, arrayval+"8"),
             new Data(9l, 90l, arrayval+"9"),
             new Data(10l, 100l, arrayval+"10")
     );

     JavaRDD<Data> javaRDD = sc.parallelize(data);
     saveToEs(javaRDD, "index/type");

Running above codes gives an exception (Stack Trace)-

15/01/16 13:20:41 INFO spark.SecurityManager: Changing view acls to: root
15/01/16 13:20:41 INFO spark.SecurityManager: Changing modify acls to: root
15/01/16 13:20:41 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with
view permissions: Set(root); users with modify permissions: Set(root)
15/01/16 13:20:41 INFO slf4j.Slf4jLogger: Slf4jLogger started
15/01/16 13:20:41 INFO Remoting: Starting remoting
15/01/16 13:20:41 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@ct-0015:55586]
15/01/16 13:20:41 INFO util.Utils: Successfully started service 'sparkDriver' on port 55586.
15/01/16 13:20:41 INFO spark.SparkEnv: Registering MapOutputTracker
15/01/16 13:20:41 INFO spark.SparkEnv: Registering BlockManagerMaster
15/01/16 13:20:41 INFO storage.DiskBlockManager: Created local directory at /tmp/spark-local-20150116132041-f924
15/01/16 13:20:41 INFO storage.MemoryStore: MemoryStore started with capacity 2.3 GB
15/01/16 13:20:41 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using
builtin-java classes where applicable
15/01/16 13:20:41 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-a65b108f-e131-480a-85b2-ed65650cf991
15/01/16 13:20:42 INFO spark.HttpServer: Starting HTTP Server
15/01/16 13:20:42 INFO server.Server: jetty-8.1.14.v20131031
15/01/16 13:20:42 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:34049
15/01/16 13:20:42 INFO util.Utils: Successfully started service 'HTTP file server' on port 34049.
15/01/16 13:20:42 INFO server.Server: jetty-8.1.14.v20131031
15/01/16 13:20:42 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
15/01/16 13:20:42 INFO util.Utils: Successfully started service 'SparkUI' on port 4040.
15/01/16 13:20:42 INFO ui.SparkUI: Started SparkUI at http://ct-0015:4040
15/01/16 13:20:42 INFO client.AppClient$ClientActor: Connecting to master spark://ct-0094:7077...
15/01/16 13:20:42 INFO cluster.SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20150116131933-0078
15/01/16 13:20:42 INFO netty.NettyBlockTransferService: Server created on 34762
15/01/16 13:20:42 INFO storage.BlockManagerMaster: Trying to register BlockManager
15/01/16 13:20:42 INFO storage.BlockManagerMasterActor: Registering block manager ct-0015:34762 with 2.3 GB RAM,
BlockManagerId(, ct-0015, 34762)
15/01/16 13:20:42 INFO storage.BlockManagerMaster: Registered BlockManager
15/01/16 13:20:42 INFO cluster.SparkDeploySchedulerBackend: SchedulerBackend is ready for scheduling beginning after
reached minRegisteredResourcesRatio: 0.0
15/01/16 13:20:43 INFO spark.SparkContext: Added JAR ./target/SparkPOC-0.0.1-SNAPSHOT-jar-with-dependencies.jar at
http://192.168.50.103:34049/jars/SparkPOC-0.0.1-SNAPSHOT-jar-with-dependencies.jar with timestamp 1421394643161
Exception in thread "main" java.lang.IncompatibleClassChangeError: Implementing class
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:760)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:455)
at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:367)
at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.elasticsearch.spark.rdd.EsSpark$.saveToEs(EsSpark.scala:30)
at org.elasticsearch.spark.rdd.EsSpark$.saveToEs(EsSpark.scala:24)
at org.elasticsearch.spark.rdd.api.java.JavaEsSpark$.saveToEs(JavaEsSpark.scala:28)
at org.elasticsearch.spark.rdd.api.java.JavaEsSpark.saveToEs(JavaEsSpark.scala)
at com.cleartrail.spark.poc.elasticsearch.demo.ESPerformerClass.main(ESPerformerClass.java:39)

I have following dependencies in pom.xml -

  <dependencies>
   <dependency> <!-- Spark dependency -->
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-core_2.10</artifactId>
       <version>1.2.0</version>
   </dependency>
   <dependency>
       <groupId>org.spark-project</groupId>
          <artifactId>spark-streaming_2.9.2</artifactId>
       <version>0.7.0</version>
   </dependency>
   <dependency>
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-streaming_2.10</artifactId>
       <version>1.2.0</version>
   </dependency>
   <dependency>
       <groupId>org.elasticsearch</groupId>
       <artifactId>elasticsearch-hadoop</artifactId>
       <version>2.0.2</version>
   </dependency>
  </dependencies>

I am using ElastiSearch-0.90.3, Apache Spark-1.2.0

Is there any version mismatch? Or the method saveToEs is deprecated?

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to
elasticsearch+unsubscribe@googlegroups.com mailto:elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit
https://groups.google.com/d/msgid/elasticsearch/aead7fef-ac62-48df-95d6-3765e139dd5e%40googlegroups.com
https://groups.google.com/d/msgid/elasticsearch/aead7fef-ac62-48df-95d6-3765e139dd5e%40googlegroups.com?utm_medium=email&utm_source=footer.
For more options, visit https://groups.google.com/d/optout.

--
Costin

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/54B927F7.10308%40gmail.com.
For more options, visit https://groups.google.com/d/optout.