Exception while Bulk inserting using spark


(Ramky) #1

I am trying to read event from kafka and extract date time, create index name using hour and do bulk insertion.
Source code:
kafkaStream.foreachRDD(rdd => {
if (rdd.count() > 0) {
val indexName:String = "test"
var eventCount:Int = 0
val client = createTransportClient(props)
createIndex(indexName, client)
val bulkRequest = client.prepareBulk()
rdd.foreach(event => {
bulkRequest.add(client.prepareIndex(indexName,"type").setSource(event.toString()))
eventCount +=1
if (eventCount%100 == 0) {
bulkRequest.execute().actionGet()
}
})
}
})

But when executing the code i am getting following exception
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1891)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:869)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:868)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:868)
at ElasticClient$$anonfun$main$1.apply(ElasticClient.scala:210)
at ElasticClient$$anonfun$main$1.apply(ElasticClient.scala:191)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:631)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:631)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:42)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:40)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:34)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:193)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:193)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:193)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:192)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.NotSerializableException: org.elasticsearch.action.bulk.BulkRequestBuilder
Serialization stack:
- object not serializable (class: org.elasticsearch.action.bulk.BulkRequestBuilder, value: org.elasticsearch.action.bulk.BulkRequestBuilder@afd9b2)
- field (class: ElasticClient$$anonfun$main$1$$anonfun$apply$1, name: bulkRequest$1, type: class org.elasticsearch.action.bulk.BulkRequestBuilder)
- object (class ElasticClient$$anonfun$main$1$$anonfun$apply$1, )
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312)

Please help


(Costin Leau) #2

@Ramky please add some formatting.

As for the exception, this is not related to the Elasticsearch Hadoop/Spark connector - you are not using it but rather relying on the Elasticsearch Java Client. In other words, it's custom code.
And the exception is pretty standard in Spark since you are running custom code across multiple nodes which is not serializable, hence the exception.

There are various ways to go around this which are Java/Spark specific. If modifying the code doesn't work, then I suggest looking at the Elasticsearch Spark connector - what this forum is all about.


(system) #3