Inserting data from Spark application creates unassigned shards


(Zoran Jeremic) #1

I have a scala spark application that should store some data to ES 2.3.3. I'm trying to insert some very simple data using the following code:

val sparkConf = new SparkConf()
.setMaster(master)
.set("spark.cores_max",maxCores.toString)
.set("spark.executor.memory","4g")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.setAppName("testapp")
.set("es.index.auto.create","true")
.set("es.nodes", "127.0.0.1")
.set("es.port","9200")
.set("es.http.timeout","5m")
.set("es.scroll.size","50")
val sc = new SparkContext(sparkConf)

val r2 = Map("arrival" -> "LA", "SFO" -> "San Fran")
val rdd = sc.makeRDD(Seq(r2))
EsSpark.saveToEs(rdd, "spark/docs")

When this code is executed it inserts data, but have unassigned shards and cluster state is Yellow. I tried to check from logs, but didn't find something use. It seems that 4 shards are created and then the following error happens before fifth shard is created:

[2016-07-24 20:11:09,369][DEBUG][cluster.service ] [Cottonmouth] cluster state update task [create-index [spark], cause [api]] failed
[spark] IndexAlreadyExistsException[already exists]
at org.elasticsearch.cluster.metadata.MetaDataCreateIndexService.validateIndexName(MetaDataCreateIndexService.java:138)
at org.elasticsearch.cluster.metadata.MetaDataCreateIndexService.validate(MetaDataCreateIndexService.java:473)
at org.elasticsearch.cluster.metadata.MetaDataCreateIndexService.access$100(MetaDataCreateIndexService.java:97)
at org.elasticsearch.cluster.metadata.MetaDataCreateIndexService$1.execute(MetaDataCreateIndexService.java:192)
at org.elasticsearch.cluster.ClusterStateUpdateTask.execute(ClusterStateUpdateTask.java:45)
at org.elasticsearch.cluster.service.InternalClusterService.runTasksForExecutor(InternalClusterService.java:468)
at org.elasticsearch.cluster.service.InternalClusterService$UpdateTask.run(InternalClusterService.java:772)
at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:231)
at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:194)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
[2016-07-24 20:11:09,372][DEBUG][cluster.service ] [Cottonmouth] processing [create-index [spark], cause [api]]: took 3ms no change in cluster_state
[2016-07-24 20:11:09,372][DEBUG][rest.suppressed ] /spark Params: {index=spark}
[spark] IndexAlreadyExistsException[already exists]
at org.elasticsearch.cluster.metadata.MetaDataCreateIndexService.validateIndexName(MetaDataCreateIndexService.java:138)
at org.elasticsearch.cluster.metadata.MetaDataCreateIndexService.validate(MetaDataCreateIndexService.java:473)
at org.elasticsearch.cluster.metadata.MetaDataCreateIndexService.access$100(MetaDataCreateIndexService.java:97)
at org.elasticsearch.cluster.metadata.MetaDataCreateIndexService$1.execute(MetaDataCreateIndexService.java:192)
at org.elasticsearch.cluster.ClusterStateUpdateTask.execute(ClusterStateUpdateTask.java:45)
at org.elasticsearch.cluster.service.InternalClusterService.runTasksForExecutor(InternalClusterService.java:468)
at org.elasticsearch.cluster.service.InternalClusterService$UpdateTask.run(InternalClusterService.java:772)
at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:231)
at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:194)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
[2016-07-24 20:11:09,410][DEBUG][index.engine ] [Cottonmouth] [spark][0] no translog ID present in the current generation - creating one

Any idea how to fix the problem?


(Jimmy Kuang) #2

The error message said Index already exist, but you're saying it still creates the index and then some unassigned shards?


(system) #3