Updating an existing index using spark

Hi,
I am having a problem updating existing index using spark. I followed the Costin's advice per link below. I tried few variations with few existing documents as a start but I am getting either "Field [_id] is a metadata field and cannot be added inside a document. " error, or in the worst case the documents disappeared from the index.

http://grokbase.com/t/gg/elasticsearch/14a1wdhygz/is-there-a-way-to-update-es-records-using-spark

Here is my sample code

   val conf = new SparkConf().setAppName("Simple Application")
            .set("es.nodes", nodes )
            .set("es.net.http.auth.user",user)
            .set("es.net.http.auth.pass",password)
            .set("es.write.operation", "update")
            .set("es.mapping.id", "school_id")

//load existing index into RDD
val esRDD = sc.esRDD("education/schools")

def addSomeProp(row:Tuple2[String,scala.collection.Map[String,AnyRef]]): Map[String,String] = {
//add some new property to the existing document
//note below should be [underscore]id, some sort of formatting is removing underscore in the post
return Map("school_id" -> row._1, "studentTeacherRatio" -> "5")
}

//take two documents for testing purpose
val testRDD = esRDD.map(row => addSomeProp(row)).take(2)

EsSpark.saveToEs(sc.parallelize(testRDD), "education/schools")

Thanks a bunch!
boreal

I got it to work! The problem was that I forgot to set "es.write.operation"). Updated above code to the working one just in case anyone else needs it.

However, now, if I try to save entire RDD I am getting the error below.

Stage 1:> (0 + 5) / 5]16/03/02 17:15:15 WARN ThrowableSerializationWrapper: Task exception could not be deserialized
java.lang.ClassNotFoundException: org.elasticsearch.hadoop.rest.EsHadoopParsingException
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at org.apache.spark.ThrowableSerializationWrapper.readObject(TaskEndReason.scala:163)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:72)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:98)
at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply$mcV$sp(TaskResultGetter.scala:108)
at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply(TaskResultGetter.scala:105)
at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply(TaskResultGetter.scala:105)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
at org.apache.spark.scheduler.TaskResultGetter$$anon$3.run(TaskResultGetter.scala:105)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Here is my updated code -- any help would be appreciated. Thanks!

val conf = new SparkConf().setAppName("Simple Application")
.set("es.nodes", nodes )
.set("es.net.http.auth.user",user)
.set("es.net.http.auth.pass",password)
.set("es.write.operation", "update")
.set("es.mapping.id", "school_id")

//load existing index into RDD
val esRDD = sc.esRDD("education/schools")

def addSomeProp(row:Tuple2[String,scala.collection.Map[String,AnyRef]]): Map[String,String] = {
//add some new property to the existing document

return Map("school_id" -> row._1, "studentTeacherRatio" -> "5")
}

//take two documents for testing purpose
val testRDD = esRDD.map(row => addSomeProp(row))

EsSpark.saveToEs(testRDD, "education/schools")

Try to see the logs.
Based on the looks of it you are running into a parsing exception - however Spark picks the exceptions, closes the classloader then rethrows it; However since the classloader was closed, any extra classes that have not been loaded, cannot be found and thus the initial exception is not hidden because of it and masked by the ClassNotFound.

I've raised an issue to try and prevent this from happening by trying to load the classes upfront.