Elasticsearch and spark

Hi,
I am using Spark to stream data and write to ES. I have downloaded
elasticsearch-hadoop-2.1.0.BUILD-SNAPSHOT.jar yesterday and use 'saveToEs'
method instead of saveAsNewAPIHadoopFiles. I get the following error when I
included the 2.1.0 jar.

scala.MatchError: org.apache.hadoop.io.MapWritable@75a899ad (of class
org.apache.hadoop.io.MapWritable)
at
org.elasticsearch.spark.serialization.ScalaMapFieldExtractor.extractField(ScalaMapFieldExtractor.scala:10)
at
org.elasticsearch.hadoop.serialization.field.ConstantFieldExtractor.field(ConstantFieldExtractor.java:32)
at
org.elasticsearch.hadoop.serialization.field.AbstractIndexExtractor.append(AbstractIndexExtractor.java:101)
at
org.elasticsearch.hadoop.serialization.field.AbstractIndexExtractor.field(AbstractIndexExtractor.java:119)
at
org.elasticsearch.hadoop.serialization.field.AbstractIndexExtractor.field(AbstractIndexExtractor.java:31)
at
org.elasticsearch.hadoop.serialization.bulk.AbstractBulkFactory$FieldWriter.write(AbstractBulkFactory.java:73)
at
org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.writeTemplate(TemplatedBulk.java:77)
at
org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.write(TemplatedBulk.java:53)
at
org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:130)
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:33)
at
org.elasticsearch.spark.rdd.EsRDDFunctions$$anonfun$saveToEs$1.apply(EsRDDFunctions.scala:43)
at
org.elasticsearch.spark.rdd.EsRDDFunctions$$anonfun$saveToEs$1.apply(EsRDDFunctions.scala:43)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
at org.apache.spark.scheduler.Task.run(Task.scala:51)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

saveAsNewAPIHadoopFiles work from Spark but I would like to use
dynamic/multi resource feature to dynamically create the index name from
Spark. Any help is greatly appreciated.

I am using elasticsearch-1.2.1,
elasticsearch-hadoop-2.1.0.BUILD-SNAPSHOT.jar, spark-streaming_2.10-1.0.1

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/671d4d05-5663-49e9-b034-a139c2198fd7%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

You seem to be using the spark integration incorrectly. Make sure you use the latest snapshot and check out the docs
here [1]. They include an explicit section on multi-resource writing [2].
Note, the native Spark integration does not handle serialization/deserialization of Writables as Spark itself does not
use them. Writables are used by Hadoop and its Map/Reduce layer which is separate from Spark. Instead simply get rid of
the Writables and read the data directly in Scala or Java types or, if you need to use Writables, handle the conversion
yourself to Scala/Java types (just as you would do with plain Spark).

Hope this helps,

[1] Elasticsearch Platform — Find real-time answers at scale | Elastic
[2] Elasticsearch Platform — Find real-time answers at scale | Elastic

On 8/7/14 10:24 PM, SaraB wrote:

Hi,
I am using Spark to stream data and write to ES. I have downloaded elasticsearch-hadoop-2.1.0.BUILD-SNAPSHOT.jar
yesterday and use 'saveToEs' method instead of saveAsNewAPIHadoopFiles. I get the following error when I included the
2.1.0 jar.

scala.MatchError: org.apache.hadoop.io.MapWritable@75a899ad (of class org.apache.hadoop.io.MapWritable)
at org.elasticsearch.spark.serialization.ScalaMapFieldExtractor.extractField(ScalaMapFieldExtractor.scala:10)
at org.elasticsearch.hadoop.serialization.field.ConstantFieldExtractor.field(ConstantFieldExtractor.java:32)
at org.elasticsearch.hadoop.serialization.field.AbstractIndexExtractor.append(AbstractIndexExtractor.java:101)
at org.elasticsearch.hadoop.serialization.field.AbstractIndexExtractor.field(AbstractIndexExtractor.java:119)
at org.elasticsearch.hadoop.serialization.field.AbstractIndexExtractor.field(AbstractIndexExtractor.java:31)
at org.elasticsearch.hadoop.serialization.bulk.AbstractBulkFactory$FieldWriter.write(AbstractBulkFactory.java:73)
at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.writeTemplate(TemplatedBulk.java:77)
at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.write(TemplatedBulk.java:53)
at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:130)
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:33)
at org.elasticsearch.spark.rdd.EsRDDFunctions$$anonfun$saveToEs$1.apply(EsRDDFunctions.scala:43)
at org.elasticsearch.spark.rdd.EsRDDFunctions$$anonfun$saveToEs$1.apply(EsRDDFunctions.scala:43)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
at org.apache.spark.scheduler.Task.run(Task.scala:51)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

saveAsNewAPIHadoopFiles work from Spark but I would like to use dynamic/multi resource feature to dynamically create the
index name from Spark. Any help is greatly appreciated.

I am using elasticsearch-1.2.1, elasticsearch-hadoop-2.1.0.BUILD-SNAPSHOT.jar, spark-streaming_2.10-1.0.1

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to
elasticsearch+unsubscribe@googlegroups.com mailto:elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit
https://groups.google.com/d/msgid/elasticsearch/671d4d05-5663-49e9-b034-a139c2198fd7%40googlegroups.com
https://groups.google.com/d/msgid/elasticsearch/671d4d05-5663-49e9-b034-a139c2198fd7%40googlegroups.com?utm_medium=email&utm_source=footer.
For more options, visit https://groups.google.com/d/optout.

--
Costin

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/53E3DEE0.7090503%40gmail.com.
For more options, visit https://groups.google.com/d/optout.

Thank you for the reply. I am working on the changes currently.

On Thursday, August 7, 2014 1:17:57 PM UTC-7, Costin Leau wrote:

You seem to be using the spark integration incorrectly. Make sure you use
the latest snapshot and check out the docs
here [1]. They include an explicit section on multi-resource writing [2].
Note, the native Spark integration does not handle
serialization/deserialization of Writables as Spark itself does not
use them. Writables are used by Hadoop and its Map/Reduce layer which is
separate from Spark. Instead simply get rid of
the Writables and read the data directly in Scala or Java types or, if you
need to use Writables, handle the conversion
yourself to Scala/Java types (just as you would do with plain Spark).

Hope this helps,

[1]
Elasticsearch Platform — Find real-time answers at scale | Elastic
[2]
Elasticsearch Platform — Find real-time answers at scale | Elastic

On 8/7/14 10:24 PM, SaraB wrote:

Hi,
I am using Spark to stream data and write to ES. I have downloaded
elasticsearch-hadoop-2.1.0.BUILD-SNAPSHOT.jar
yesterday and use 'saveToEs' method instead of saveAsNewAPIHadoopFiles.
I get the following error when I included the
2.1.0 jar.

scala.MatchError: org.apache.hadoop.io.MapWritable@75a899ad (of class
org.apache.hadoop.io.MapWritable)
at
org.elasticsearch.spark.serialization.ScalaMapFieldExtractor.extractField(ScalaMapFieldExtractor.scala:10)

at
org.elasticsearch.hadoop.serialization.field.ConstantFieldExtractor.field(ConstantFieldExtractor.java:32)

at
org.elasticsearch.hadoop.serialization.field.AbstractIndexExtractor.append(AbstractIndexExtractor.java:101)

at
org.elasticsearch.hadoop.serialization.field.AbstractIndexExtractor.field(AbstractIndexExtractor.java:119)

at
org.elasticsearch.hadoop.serialization.field.AbstractIndexExtractor.field(AbstractIndexExtractor.java:31)

at
org.elasticsearch.hadoop.serialization.bulk.AbstractBulkFactory$FieldWriter.write(AbstractBulkFactory.java:73)

at
org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.writeTemplate(TemplatedBulk.java:77)

at
org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.write(TemplatedBulk.java:53)

at
org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:130)

at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:33)
at
org.elasticsearch.spark.rdd.EsRDDFunctions$$anonfun$saveToEs$1.apply(EsRDDFunctions.scala:43)

at
org.elasticsearch.spark.rdd.EsRDDFunctions$$anonfun$saveToEs$1.apply(EsRDDFunctions.scala:43)

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
at org.apache.spark.scheduler.Task.run(Task.scala:51)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:745)

saveAsNewAPIHadoopFiles work from Spark but I would like to use
dynamic/multi resource feature to dynamically create the
index name from Spark. Any help is greatly appreciated.

I am using elasticsearch-1.2.1,
elasticsearch-hadoop-2.1.0.BUILD-SNAPSHOT.jar, spark-streaming_2.10-1.0.1

--
You received this message because you are subscribed to the Google
Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send
an email to
elasticsearc...@googlegroups.com <javascript:> <mailto:
elasticsearch+unsubscribe@googlegroups.com <javascript:>>.
To view this discussion on the web visit

https://groups.google.com/d/msgid/elasticsearch/671d4d05-5663-49e9-b034-a139c2198fd7%40googlegroups.com

<
https://groups.google.com/d/msgid/elasticsearch/671d4d05-5663-49e9-b034-a139c2198fd7%40googlegroups.com?utm_medium=email&utm_source=footer>.

For more options, visit https://groups.google.com/d/optout.

--
Costin

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/54ebe0e0-cb62-4e6f-a829-7c7dddd04be5%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

I have been testing spark to ES data indexing using saveToEs on JavaRDD.
With the new elasticsearch-hadoop.2.1.0.BUILD-SNAPSHOT jar, I am able to
insert documents in ES but the following scenarios are not working:

if es.mapping.id, [and or ] es.mapping.parent properties are set in
SparkConf, insert fails.
if dynamic/multi-resource writing is set, insert fails.

I get the following exception for both the scenarios:

(of class java.util.HashMap)

org.elasticsearch.spark.serialization.ScalaMapFieldExtractor.extractField(ScalaMapFieldExtractor.scala:10)

org.elasticsearch.hadoop.serialization.field.ConstantFieldExtractor.field(ConstantFieldExtractor.java:32)

org.elasticsearch.hadoop.serialization.field.AbstractIndexExtractor.append(AbstractIndexExtractor.java:101)

org.elasticsearch.hadoop.serialization.field.AbstractIndexExtractor.field(AbstractIndexExtractor.java:119)

org.elasticsearch.hadoop.serialization.field.AbstractIndexExtractor.field(AbstractIndexExtractor.java:31)

org.elasticsearch.hadoop.serialization.bulk.AbstractBulkFactory$FieldWriter.write(AbstractBulkFactory.java:73)

org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.writeTemplate(TemplatedBulk.java:77)

org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.write(TemplatedBulk.java:53)

org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:130)
org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:33)

org.elasticsearch.spark.rdd.EsRDDFunctions$$anonfun$saveToEs$1.apply(EsRDDFunctions.scala:43)

org.elasticsearch.spark.rdd.EsRDDFunctions$$anonfun$saveToEs$1.apply(EsRDDFunctions.scala:43)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
org.apache.spark.scheduler.Task.run(Task.scala:51)

org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1044)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1028)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1026)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1026)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:634)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1229)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Your help is greatly appreciated.

SaraB

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/682ba393-497e-4d32-8794-89e81d24ddec%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Sorry to hear that.

I strongly suggest updating to the latest SNAPSHOT as both dynamic and es.mapping.id (and co) are working (there are
several tests hitting on exactly this in the suite).
Going forward, please try to provide more information on what your code looks like and the version you are using - with
snapshot this is not obvious however on each run,
the exact project version and signature is logged, for example:

INFO main util.Version - Elasticsearch Hadoop v2.1.0.BUILD-SNAPSHOT [b86405c7b2]

This is described at length in [1]

Cheers,

[1] Elasticsearch Platform — Find real-time answers at scale | Elastic

On 8/8/14 11:14 PM, SaraB wrote:

I have been testing spark to ES data indexing using saveToEs on JavaRDD. With the new
elasticsearch-hadoop.2.1.0.BUILD-SNAPSHOT jar, I am able to insert documents in ES but the following scenarios are not
working:

if es.mapping.id, [and or ] es.mapping.parent properties are set in SparkConf, insert fails.
if dynamic/multi-resource writing is set, insert fails.

I get the following exception for both the scenarios:

(of class java.util.HashMap)
org.elasticsearch.spark.serialization.ScalaMapFieldExtractor.extractField(ScalaMapFieldExtractor.scala:10)
org.elasticsearch.hadoop.serialization.field.ConstantFieldExtractor.field(ConstantFieldExtractor.java:32)
org.elasticsearch.hadoop.serialization.field.AbstractIndexExtractor.append(AbstractIndexExtractor.java:101)
org.elasticsearch.hadoop.serialization.field.AbstractIndexExtractor.field(AbstractIndexExtractor.java:119)
org.elasticsearch.hadoop.serialization.field.AbstractIndexExtractor.field(AbstractIndexExtractor.java:31)
org.elasticsearch.hadoop.serialization.bulk.AbstractBulkFactory$FieldWriter.write(AbstractBulkFactory.java:73)
org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.writeTemplate(TemplatedBulk.java:77)
org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.write(TemplatedBulk.java:53)
org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:130)
org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:33)
org.elasticsearch.spark.rdd.EsRDDFunctions$$anonfun$saveToEs$1.apply(EsRDDFunctions.scala:43)
org.elasticsearch.spark.rdd.EsRDDFunctions$$anonfun$saveToEs$1.apply(EsRDDFunctions.scala:43)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
org.apache.spark.scheduler.Task.run(Task.scala:51)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1044)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1028)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1026)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1026)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:634)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1229)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Your help is greatly appreciated.

SaraB

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to
elasticsearch+unsubscribe@googlegroups.com mailto:elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit
https://groups.google.com/d/msgid/elasticsearch/682ba393-497e-4d32-8794-89e81d24ddec%40googlegroups.com
https://groups.google.com/d/msgid/elasticsearch/682ba393-497e-4d32-8794-89e81d24ddec%40googlegroups.com?utm_medium=email&utm_source=footer.
For more options, visit https://groups.google.com/d/optout.

--
Costin

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/53E75D2F.2060901%40gmail.com.
For more options, visit https://groups.google.com/d/optout.

This is still an issue with both the Beta and the latest Snapshot builds.
By latest I mean 9/26/2014. The field _id s clearly present in the Map that
is attempted to index.

2014-09-27 20:55:20,634 [Executor task launch worker-3] ERROR
org.apache.spark.executor.Executor - Exception in task 17.7 in stage 0.0
(TID 210)

org.elasticsearch.hadoop.EsHadoopIllegalArgumentException:
[ScalaMapFieldExtractor for field [_id]] cannot extract value from object
[Map(count -> 1, location -> Greater New York City Area, name -> Clinical
Exposure and Mentoring Program-Volunteer, geoloc -> [ -74.00597309999999 ,
40.7143528], _id -> 541ff36fe4b064f2e4d6a14a)]

On Sunday, August 10, 2014 4:53:39 AM UTC-7, Costin Leau wrote:

Sorry to hear that.

I strongly suggest updating to the latest SNAPSHOT as both dynamic and
es.mapping.id (and co) are working (there are
several tests hitting on exactly this in the suite).
Going forward, please try to provide more information on what your code
looks like and the version you are using - with
snapshot this is not obvious however on each run,
the exact project version and signature is logged, for example:

INFO main util.Version - Elasticsearch Hadoop v2.1.0.BUILD-SNAPSHOT
[b86405c7b2]

This is described at length in [1]

Cheers,

[1]
Elasticsearch Platform — Find real-time answers at scale | Elastic

On 8/8/14 11:14 PM, SaraB wrote:

I have been testing spark to ES data indexing using saveToEs on JavaRDD.
With the new
elasticsearch-hadoop.2.1.0.BUILD-SNAPSHOT jar, I am able to insert
documents in ES but the following scenarios are not
working:

if es.mapping.id, [and or ] es.mapping.parent properties are set in
SparkConf, insert fails.
if dynamic/multi-resource writing is set, insert fails.

I get the following exception for both the scenarios:

(of class java.util.HashMap)

org.elasticsearch.spark.serialization.ScalaMapFieldExtractor.extractField(ScalaMapFieldExtractor.scala:10)

org.elasticsearch.hadoop.serialization.field.ConstantFieldExtractor.field(ConstantFieldExtractor.java:32)

org.elasticsearch.hadoop.serialization.field.AbstractIndexExtractor.append(AbstractIndexExtractor.java:101)

org.elasticsearch.hadoop.serialization.field.AbstractIndexExtractor.field(AbstractIndexExtractor.java:119)

org.elasticsearch.hadoop.serialization.field.AbstractIndexExtractor.field(AbstractIndexExtractor.java:31)

org.elasticsearch.hadoop.serialization.bulk.AbstractBulkFactory$FieldWriter.write(AbstractBulkFactory.java:73)

org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.writeTemplate(TemplatedBulk.java:77)

org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.write(TemplatedBulk.java:53)

org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:130)

org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:33)

org.elasticsearch.spark.rdd.EsRDDFunctions$$anonfun$saveToEs$1.apply(EsRDDFunctions.scala:43)

org.elasticsearch.spark.rdd.EsRDDFunctions$$anonfun$saveToEs$1.apply(EsRDDFunctions.scala:43)

org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)

     org.apache.spark.scheduler.Task.run(Task.scala:51) 

org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

     java.lang.Thread.run(Thread.java:745) 

Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1044)

at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1028)

at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1026)

at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1026)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634)

at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634)

at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:634)

at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1229)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)

at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Your help is greatly appreciated. 

SaraB

--
You received this message because you are subscribed to the Google
Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send
an email to
elasticsearc...@googlegroups.com <javascript:> <mailto:
elasticsearch+unsubscribe@googlegroups.com <javascript:>>.
To view this discussion on the web visit

https://groups.google.com/d/msgid/elasticsearch/682ba393-497e-4d32-8794-89e81d24ddec%40googlegroups.com

<
https://groups.google.com/d/msgid/elasticsearch/682ba393-497e-4d32-8794-89e81d24ddec%40googlegroups.com?utm_medium=email&utm_source=footer>.

For more options, visit https://groups.google.com/d/optout.

--
Costin

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/3e5f4122-13ee-406e-a2c8-6ebac889ef4b%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Not sure whether you've seen the reply on Github but there was a fix pushed (I assume you are using a mutable collection
and due to an incorrect import the extractor
was expecting an immutable one).

On 9/28/14 12:07 AM, Jonathan Foley wrote:

This is still an issue with both the Beta and the latest Snapshot builds. By latest I mean 9/26/2014. The field _id s
clearly present in the Map that is attempted to index.

2014-09-27 20:55:20,634 [Executor task launch worker-3] ERROR org.apache.spark.executor.Executor - Exception in task
17.7 in stage 0.0 (TID 210)

org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: [ScalaMapFieldExtractor for field [_id]] cannot extract value
from object [Map(count -> 1, location -> Greater New York City Area, name -> Clinical Exposure and Mentoring
Program-Volunteer, geoloc -> [ -74.00597309999999 , 40.7143528], _id -> 541ff36fe4b064f2e4d6a14a)]

On Sunday, August 10, 2014 4:53:39 AM UTC-7, Costin Leau wrote:

Sorry to hear that.

I strongly suggest updating to the latest SNAPSHOT as both dynamic and es.mapping.id <http://es.mapping.id> (and co)
are working (there are
several tests hitting on exactly this in the suite).
Going forward, please try to provide more information on what your code looks like and the version you are using - with
snapshot this is not obvious however on each run,
the exact project version and signature is logged, for example:

INFO main util.Version - Elasticsearch Hadoop v2.1.0.BUILD-SNAPSHOT [b86405c7b2]

This is described at length in [1]

Cheers,

[1] http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/master/troubleshooting.html#help
<http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/master/troubleshooting.html#help>

On 8/8/14 11:14 PM, SaraB wrote:
> I have been testing spark to ES data indexing using saveToEs on JavaRDD. With the new
> elasticsearch-hadoop.2.1.0.BUILD-SNAPSHOT jar, I am able to insert documents in ES but the following scenarios are not
> working:
>
> ifes.mapping.id <http://es.mapping.id>, [and or ] es.mapping.parent properties are set in SparkConf, insert fails.
> if dynamic/multi-resource writing is set, insert fails.
>
> I get the following exception for both the scenarios:
>
> (of class java.util.HashMap)
>          org.elasticsearch.spark.serialization.ScalaMapFieldExtractor.extractField(ScalaMapFieldExtractor.scala:10)
>          org.elasticsearch.hadoop.serialization.field.ConstantFieldExtractor.field(ConstantFieldExtractor.java:32)
>          org.elasticsearch.hadoop.serialization.field.AbstractIndexExtractor.append(AbstractIndexExtractor.java:101)
>          org.elasticsearch.hadoop.serialization.field.AbstractIndexExtractor.field(AbstractIndexExtractor.java:119)
>          org.elasticsearch.hadoop.serialization.field.AbstractIndexExtractor.field(AbstractIndexExtractor.java:31)
>          org.elasticsearch.hadoop.serialization.bulk.AbstractBulkFactory$FieldWriter.write(AbstractBulkFactory.java:73)
>          org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.writeTemplate(TemplatedBulk.java:77)
>          org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.write(TemplatedBulk.java:53)
>          org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:130)
>          org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:33)
>          org.elasticsearch.spark.rdd.EsRDDFunctions$$anonfun$saveToEs$1.apply(EsRDDFunctions.scala:43)
>          org.elasticsearch.spark.rdd.EsRDDFunctions$$anonfun$saveToEs$1.apply(EsRDDFunctions.scala:43)
>          org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
>          org.apache.spark.scheduler.Task.run(Task.scala:51)
>          org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
>          java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>          java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>          java.lang.Thread.run(Thread.java:745)
> Driver stacktrace:
> at
>org.apache.spark.scheduler.DAGScheduler.org
<http://org.apache.spark.scheduler.DAGScheduler.org>$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1044)

> at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1028)
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1026)
> at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1026)
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634)
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634)
> at scala.Option.foreach(Option.scala:236)
> at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:634)
> at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1229)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
>     Your help is greatly appreciated.
>
>
> SaraB
>
> --
> You received this message because you are subscribed to the Google Groups "elasticsearch" group.
> To unsubscribe from this group and stop receiving emails from it, send an email to
>elasticsearc...@googlegroups.com <javascript:> <mailto:elasticsearch+unsubscribe@googlegroups.com <javascript:>>.
> To view this discussion on the web visit
>https://groups.google.com/d/msgid/elasticsearch/682ba393-497e-4d32-8794-89e81d24ddec%40googlegroups.com
<https://groups.google.com/d/msgid/elasticsearch/682ba393-497e-4d32-8794-89e81d24ddec%40googlegroups.com>
> <https://groups.google.com/d/msgid/elasticsearch/682ba393-497e-4d32-8794-89e81d24ddec%40googlegroups.com?utm_medium=email&utm_source=footer
<https://groups.google.com/d/msgid/elasticsearch/682ba393-497e-4d32-8794-89e81d24ddec%40googlegroups.com?utm_medium=email&utm_source=footer>>.

> For more options, visithttps://groups.google.com/d/optout <https://groups.google.com/d/optout>.

--
Costin

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to
elasticsearch+unsubscribe@googlegroups.com mailto:elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit
https://groups.google.com/d/msgid/elasticsearch/3e5f4122-13ee-406e-a2c8-6ebac889ef4b%40googlegroups.com
https://groups.google.com/d/msgid/elasticsearch/3e5f4122-13ee-406e-a2c8-6ebac889ef4b%40googlegroups.com?utm_medium=email&utm_source=footer.
For more options, visit https://groups.google.com/d/optout.

--
Costin

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/5428699E.7080808%40gmail.com.
For more options, visit https://groups.google.com/d/optout.