I have ES, Spark, and ES hadoop adapter installed on my laptop. I wrote a
simple scala notebook to test ES adapter.
Everything was fine until I started thinking at more sophisticated
features. This is the snippet that drives me crazy:
%AddJar
file:///tools/elasticsearch-hadoop-2.1.0.Beta3/dist/elasticsearch-hadoop-2.1.0.BUILD-SNAPSHOT.jar
%AddJar
file:///tools/elasticsearch-hadoop-2.1.0.Beta3/dist/elasticsearch-spark_2.10-2.1.0.BUILD-SNAPSHOT.jar
import org.elasticsearch.spark.rdd._
val q2 = """{
|"query" : { "term": { "appName": "console" } },
|"aggregations": {
| "unusual": {
| "significant_terms": {"field": "pathname"}
| }
|}
|}""".stripMargin
val res = sc.esRDD("logs/app", q2);
println("Matches: " + res.count())
When I run the code I get this exception:
Name: org.apache.spark.SparkException
Message: Job aborted due to stage failure: Task 2 in stage 15.0 failed 1 times, most recent failure: Lost task 2.0 in stage 15.0 (TID 58, localhost): org.apache.spark.util.TaskCompletionListenerException: SearchPhaseExecutionException[Failed to execute phase [init_scan], all shards failed; shardFailures {[N1R-UlgOQCGXCFCtbJ3sBQ][logrecords][2]: ElasticsearchIllegalArgumentException[aggregations are not supported with search_type=scan]}]
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76)
at org.apache.spark.scheduler.Task.run(Task.scala:58)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
"aggregations are not supported with search_type=scan", which is fine.
The question is: how do I set search_type to the right value (e.g. count) in the sc.esRDD() call?
I tried several places in the q2 json with no success and I was not able to find an answer through
the documentation. I would appreciate any help.
However, I see a possible inconsistency with the behaviour of the ES API used directly via cURL.
The command with the same query above, and without any setting about search_type works correctly:
curl 'localhost:9200/logs/app/_search?pretty' -d'{"query" : { "term": { "appName": "console" } },
"aggregations": { "unusual": { "significant_terms": {"field": "pathname"} }}}'
returns hits:{} and aggregations:{}. Why the Spark integration does not work the same ?
--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/d044d380-a4b2-4d22-8990-60f318f7601a%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.