[Spark] SchemaRdd saveToEs produces "Bad JSON" errors

Hi,

I'm a very new user to ES so Im hoping someone can point me to what I'm
doing wrong.

I did the following:

download a fresh copy of Spark1.2.1. Switch to bin folder and ran:

wget https://oss.sonatype.org/content/repositories/snapshots/org/elasticsearch/elasticsearch-hadoop/2.1.0.BUILD-SNAPSHOT/elasticsearch-hadoop-2.1.0.BUILD-20150324.023417-341.jar
./spark-shell --jars elasticsearch-hadoop-2.1.0.BUILD-20150324.023417-341.jar

import org.apache.spark.sql.SQLContext

case class KeyValue(key: Int, value: String)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

import sqlContext._

sc.parallelize(1 to 50).map(i=>KeyValue(i, i.toString))
.saveAsParquetFile("large.parquet")
parquetFile("large.parquet").registerTempTable("large")

val schemaRDD = sql("SELECT * FROM large")
import org.elasticsearch.spark._

schemaRDD.saveToEs("test/spark")

At this point I get errors like this:

15/03/24 11:02:35 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool
15/03/24 11:02:35 INFO DAGScheduler: Job 2 failed: runJob at EsSpark.scala:51, took 0.302236 s
org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 2.0 failed 1 times, most recent failure: Lost task 2.0 in stage 2.0 (TID 10, localhost): org.apache.spark.util.TaskComplet
ionListenerException: Found unrecoverable error [Bad Request(400) - Invalid JSON fragment received[[38,"38"]][MapperParsingException[failed to parse]; nested: ElasticsearchParseException[Failed to derive x
content from (offset=13, length=9): [123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 51, 56, 44, 34, 51, 56, 34, 93, 10, 123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 51
, 57, 44, 34, 51, 57, 34, 93, 10, 123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 52, 48, 44, 34, 52, 48, 34, 93, 10, 123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 52, 4
9, 44, 34, 52, 49, 34, 93, 10, 123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 52, 50, 44, 34, 52, 50, 34, 93, 10, 123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 52, 51,
44, 34, 52, 51, 34, 93, 10, 123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 52, 52, 44, 34, 52, 52, 34, 93, 10, 123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 52, 53, 44,
34, 52, 53, 34, 93, 10, 123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 52, 54, 44, 34, 52, 54, 34, 93, 10, 123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 52, 55, 44, 34
, 52, 55, 34, 93, 10, 123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 52, 56, 44, 34, 52, 56, 34, 93, 10, 123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 52, 57, 44, 34, 5
2, 57, 34, 93, 10, 123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 53, 48, 44, 34, 53, 48, 34, 93, 10]]; ]]; Bailing out..
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76)

Any idea what I'm doing wrong? (I started with the Beta3 jar before trying
the nightly but also with no luck). I am running against elasticsearch1.4.4

/spark_1.2.1/bin$ curl localhost:9200
{
"status" : 200,
"name" : "Wizard",
"cluster_name" : "elasticsearch",
"version" : {
"number" : "1.4.4",
"build_hash" : "c88f77ffc81301dfa9dfd81ca2232f09588bd512",
"build_timestamp" : "2015-02-19T13:05:36Z",
"build_snapshot" : false,
"lucene_version" : "4.10.3"
},
"tagline" : "You Know, for Search"
}

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/c9c63ee8-0683-46b3-b023-de348d34b560%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Hi,

It appears the problem lies with what type of document you are trying to save to ES - which is basically invalid.
For some reason the table/rdd schema gets lost and the content is serialized without it:

|[38,"38"]|

I'm not sure why this happens hence why I raised an issue [1]

Thanks,

[1] https://github.com/elastic/elasticsearch-hadoop/issues/403

On 3/24/15 5:11 PM, Yana Kadiyska wrote:

Hi,

I'm a very new user to ES so Im hoping someone can point me to what I'm doing wrong.

I did the following:

download a fresh copy of Spark1.2.1. Switch to bin folder and ran:

|wget https://oss.sonatype.org/content/repositories/snapshots/org/elasticsearch/elasticsearch-hadoop/2.1.0.BUILD-SNAPSHOT/elasticsearch-hadoop-2.1.0.BUILD-20150324.023417-341.jar
./spark-shell --jars elasticsearch-hadoop-2.1.0.BUILD-20150324.023417-341.jar

import org.apache.spark.sql.SQLContext

case class KeyValue(key: Int, value: String)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

import sqlContext._

sc.parallelize(1 to 50).map(i=>KeyValue(i, i.toString))
.saveAsParquetFile("large.parquet")
parquetFile("large.parquet").registerTempTable("large")

val schemaRDD = sql("SELECT * FROM large")
import org.elasticsearch.spark._

schemaRDD.saveToEs("test/spark")
|

At this point I get errors like this:

|15/03/24 11:02:35 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool
15/03/24 11:02:35 INFO DAGScheduler: Job 2 failed: runJob at EsSpark.scala:51, took 0.302236 s
org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 2.0 failed 1 times, most recent failure: Lost task 2.0 in stage 2.0 (TID 10, localhost): org.apache.spark.util.TaskComplet
ionListenerException: Found unrecoverable error [Bad Request(400) - Invalid JSON fragment received[[38,"38"]][MapperParsingException[failed to parse]; nested: ElasticsearchParseException[Failed to derive x
content from (offset=13, length=9): [123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 51, 56, 44, 34, 51, 56, 34, 93, 10, 123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 51
, 57, 44, 34, 51, 57, 34, 93, 10, 123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 52, 48, 44, 34, 52, 48, 34, 93, 10, 123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 52, 4
9, 44, 34, 52, 49, 34, 93, 10, 123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 52, 50, 44, 34, 52, 50, 34, 93, 10, 123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 52, 51,
44, 34, 52, 51, 34, 93, 10, 123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 52, 52, 44, 34, 52, 52, 34, 93, 10, 123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 52, 53, 44,
34, 52, 53, 34, 93, 10, 123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 52, 54, 44, 34, 52, 54, 34, 93, 10, 123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 52, 55, 44, 34
, 52, 55, 34, 93, 10, 123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 52, 56, 44, 34, 52, 56, 34, 93, 10, 123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 52, 57, 44, 34, 5
2, 57, 34, 93, 10, 123, 34, 105, 110, 100, 101, 120, 34, 58, 123, 125, 125, 10, 91, 53, 48, 44, 34, 53, 48, 34, 93, 10]]; ]]; Bailing out..
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76)
|

Any idea what I'm doing wrong? (I started with the Beta3 jar before trying the nightly but also with no luck). I am
running against elasticsearch1.4.4

|/spark_1.2.1/bin$ curl localhost:9200
{
"status" : 200,
"name" : "Wizard",
"cluster_name" : "elasticsearch",
"version" : {
"number" : "1.4.4",
"build_hash" : "c88f77ffc81301dfa9dfd81ca2232f09588bd512",
"build_timestamp" : "2015-02-19T13:05:36Z",
"build_snapshot" : false,
"lucene_version" : "4.10.3"
},
"tagline" : "You Know, for Search"
}
|

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to
elasticsearch+unsubscribe@googlegroups.com mailto:elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit
https://groups.google.com/d/msgid/elasticsearch/c9c63ee8-0683-46b3-b023-de348d34b560%40googlegroups.com
https://groups.google.com/d/msgid/elasticsearch/c9c63ee8-0683-46b3-b023-de348d34b560%40googlegroups.com?utm_medium=email&utm_source=footer.
For more options, visit https://groups.google.com/d/optout.

--
Costin

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/5511A5B1.4070907%40gmail.com.
For more options, visit https://groups.google.com/d/optout.