I am trying to read event from kafka and extract date time, create index name using hour and do bulk insertion.
Source code:
kafkaStream.foreachRDD(rdd => {
if (rdd.count() > 0) {
val indexName:String = "test"
var eventCount:Int = 0
val client = createTransportClient(props)
createIndex(indexName, client)
val bulkRequest = client.prepareBulk()
rdd.foreach(event => {
bulkRequest.add(client.prepareIndex(indexName,"type").setSource(event.toString()))
eventCount +=1
if (eventCount%100 == 0) {
bulkRequest.execute().actionGet()
}
})
}
})
But when executing the code i am getting following exception
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1891)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:869)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:868)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:868)
at ElasticClient$$anonfun$main$1.apply(ElasticClient.scala:210)
at ElasticClient$$anonfun$main$1.apply(ElasticClient.scala:191)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:631)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:631)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:42)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:40)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:34)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:193)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:193)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:193)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:192)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.NotSerializableException: org.elasticsearch.action.bulk.BulkRequestBuilder
Serialization stack:
- object not serializable (class:
org.elasticsearch.action.bulk.BulkRequestBuilder, value:
org.elasticsearch.action.bulk.BulkRequestBuilder@afd9b2)
- field
(class: ElasticClient$$anonfun$main$1$$anonfun$apply$1, name:
bulkRequest$1, type: class
org.elasticsearch.action.bulk.BulkRequestBuilder)
- object (class ElasticClient$$anonfun$main$1$$anonfun$apply$1, )
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312)
Please help