How to get aggregations working in Elasticsearch Spark adapter?

I have ES, Spark, and ES hadoop adapter installed on my laptop. I wrote a
simple scala notebook to test ES adapter.
Everything was fine until I started thinking at more sophisticated
features. This is the snippet that drives me crazy:

%AddJar
file:///tools/elasticsearch-hadoop-2.1.0.Beta3/dist/elasticsearch-hadoop-2.1.0.BUILD-SNAPSHOT.jar
%AddJar
file:///tools/elasticsearch-hadoop-2.1.0.Beta3/dist/elasticsearch-spark_2.10-2.1.0.BUILD-SNAPSHOT.jar

import org.elasticsearch.spark.rdd._

val q2 = """{
|"query" : { "term": { "appName": "console" } },
|"aggregations": {
| "unusual": {
| "significant_terms": {"field": "pathname"}
| }
|}
|}""".stripMargin

val res = sc.esRDD("logs/app", q2);

println("Matches: " + res.count())

When I run the code I get this exception:

Name: org.apache.spark.SparkException
Message: Job aborted due to stage failure: Task 2 in stage 15.0 failed 1 times, most recent failure: Lost task 2.0 in stage 15.0 (TID 58, localhost): org.apache.spark.util.TaskCompletionListenerException: SearchPhaseExecutionException[Failed to execute phase [init_scan], all shards failed; shardFailures {[N1R-UlgOQCGXCFCtbJ3sBQ][logrecords][2]: ElasticsearchIllegalArgumentException[aggregations are not supported with search_type=scan]}]
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76)
at org.apache.spark.scheduler.Task.run(Task.scala:58)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

"aggregations are not supported with search_type=scan", which is fine.
The question is: how do I set search_type to the right value (e.g. count) in the sc.esRDD() call?
I tried several places in the q2 json with no success and I was not able to find an answer through
the documentation. I would appreciate any help.

However, I see a possible inconsistency with the behaviour of the ES API used directly via cURL.
The command with the same query above, and without any setting about search_type works correctly:

curl 'localhost:9200/logs/app/_search?pretty' -d'{"query" : { "term": { "appName": "console" } },
"aggregations": { "unusual": { "significant_terms": {"field": "pathname"} }}}'

returns hits:{} and aggregations:{}. Why the Spark integration does not work the same ?

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/d044d380-a4b2-4d22-8990-60f318f7601a%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

The short answer is that the connector relies on scan/scroll search for its core functionality. And with aggs it needs
to switch the way it queries the cluster to a count search.
This is the last major feature that needs to be addressed before the 2.1 release. There's also an issue for it raised
here [1] which you can track.

Cheers,

[1] Enhance aggregation support · Issue #276 · elastic/elasticsearch-hadoop · GitHub

On 4/1/15 12:53 PM, michele crudele wrote:

I have ES, Spark, and ES hadoop adapter installed on my laptop. I wrote a simple scala notebook to test ES adapter.
Everything was fine until I started thinking at more sophisticated features. This is the snippet that drives me crazy:

%AddJar file:///tools/elasticsearch-hadoop-2.1.0.Beta3/dist/elasticsearch-hadoop-2.1.0.BUILD-SNAPSHOT.jar
%AddJar file:///tools/elasticsearch-hadoop-2.1.0.Beta3/dist/elasticsearch-spark_2.10-2.1.0.BUILD-SNAPSHOT.jar

import org.elasticsearch.spark.rdd._

val q2 = """{
|"query" : { "term": { "appName": "console" } },
|"aggregations": {
| "unusual": {
| "significant_terms": {"field": "pathname"}
| }
|}
|}""".stripMargin

val res = sc.esRDD("logs/app", q2);

println("Matches: " + res.count())

When I run the code I get this exception:

Name: org.apache.spark.SparkException
Message: Job aborted due to stage failure: Task 2 in stage 15.0 failed 1 times, most recent failure: Lost task 2.0 in stage 15.0 (TID 58, localhost): org.apache.spark.util.TaskCompletionListenerException: SearchPhaseExecutionException[Failed to execute phase [init_scan], all shards failed; shardFailures {[N1R-UlgOQCGXCFCtbJ3sBQ][logrecords][2]: ElasticsearchIllegalArgumentException[aggregations are not supported with search_type=scan]}]
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76)
at org.apache.spark.scheduler.Task.run(Task.scala:58)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

"aggregations are not supported with search_type=scan", which is fine.
The question is: how do I set search_type to the right value (e.g. count) in the sc.esRDD() call?
I tried several places in the q2 json with no success and I was not able to find an answer through
the documentation. I would appreciate any help.

However, I see a possible inconsistency with the behaviour of the ES API used directly via cURL.
The command with the same query above, and without any setting about search_type works correctly:

curl 'localhost:9200/logs/app/_search?pretty' -d'{"query" : { "term": { "appName": "console" } },
"aggregations": { "unusual": { "significant_terms": {"field": "pathname"} }}}'

returns hits:{} and aggregations:{}. Why the Spark integration does not work the same ?

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to
elasticsearch+unsubscribe@googlegroups.com mailto:elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit
https://groups.google.com/d/msgid/elasticsearch/d044d380-a4b2-4d22-8990-60f318f7601a%40googlegroups.com
https://groups.google.com/d/msgid/elasticsearch/d044d380-a4b2-4d22-8990-60f318f7601a%40googlegroups.com?utm_medium=email&utm_source=footer.
For more options, visit https://groups.google.com/d/optout.

--
Costin

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/551BC221.60400%40gmail.com.
For more options, visit https://groups.google.com/d/optout.

Thanks,

when is the 2.1 release coming?

Another question, which I think is related to this one btw... I was able to
run this piece of code using facets:

val q5 = """
|{
| "query": {
| "match": { "_all": "error"}},
| "facets":{
| "appName": {"terms": {"field": "appName"}},
| "sourceName": {"terms": {"field": "sourceName"}}}
|}
""".stripMargin

println("Query: " + q5)

val rdd = sc.esRDD("logs/app", q5);

What I get from the rdd are tuples (docID, Map[of the field=value]). Should
I also expect to find facets ? If so, how do I get them ?

Il giorno mercoledì 1 aprile 2015 12:02:20 UTC+2, Costin Leau ha scritto:

The short answer is that the connector relies on scan/scroll search for
its core functionality. And with aggs it needs
to switch the way it queries the cluster to a count search.
This is the last major feature that needs to be addressed before the 2.1
release. There's also an issue for it raised
here [1] which you can track.

Cheers,

[1] Enhance aggregation support · Issue #276 · elastic/elasticsearch-hadoop · GitHub

On 4/1/15 12:53 PM, michele crudele wrote:

I have ES, Spark, and ES hadoop adapter installed on my laptop. I wrote
a simple scala notebook to test ES adapter.
Everything was fine until I started thinking at more sophisticated
features. This is the snippet that drives me crazy:

%AddJar
file:///tools/elasticsearch-hadoop-2.1.0.Beta3/dist/elasticsearch-hadoop-2.1.0.BUILD-SNAPSHOT.jar

%AddJar
file:///tools/elasticsearch-hadoop-2.1.0.Beta3/dist/elasticsearch-spark_2.10-2.1.0.BUILD-SNAPSHOT.jar

import org.elasticsearch.spark.rdd._

val q2 = """{
|"query" : { "term": { "appName": "console" } },
|"aggregations": {
| "unusual": {
| "significant_terms": {"field": "pathname"}
| }
|}
|}""".stripMargin

val res = sc.esRDD("logs/app", q2);

println("Matches: " + res.count())

When I run the code I get this exception:

Name: org.apache.spark.SparkException
Message: Job aborted due to stage failure: Task 2 in stage 15.0 failed 1
times, most recent failure: Lost task 2.0 in stage 15.0 (TID 58,
localhost): org.apache.spark.util.TaskCompletionListenerException:
SearchPhaseExecutionException[Failed to execute phase [init_scan], all
shards failed; shardFailures {[N1R-UlgOQCGXCFCtbJ3sBQ][logrecords][2]:
ElasticsearchIllegalArgumentException[aggregations are not supported with
search_type=scan]}]
at
org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76)

    at org.apache.spark.scheduler.Task.run(Task.scala:58) 
    at 

org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)

    at 

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

    at 

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

    at java.lang.Thread.run(Thread.java:745) 

"aggregations are not supported with search_type=scan", which is fine.
The question is: how do I set search_type to the right value (e.g.
count) in the sc.esRDD() call?
I tried several places in the q2 json with no success and I was not able
to find an answer through
the documentation. I would appreciate any help.

However, I see a possible inconsistency with the behaviour of the ES API
used directly via cURL.
The command with the same query above, and without any setting about
search_type works correctly:

curl 'localhost:9200/logs/app/_search?pretty' -d'{"query" : { "term": {
"appName": "console" } },
"aggregations": { "unusual": { "significant_terms": {"field":
"pathname"} }}}'

returns hits:{} and aggregations:{}. Why the Spark integration does not
work the same ?

--
You received this message because you are subscribed to the Google
Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send
an email to
elasticsearc...@googlegroups.com <javascript:> <mailto:
elasticsearch+unsubscribe@googlegroups.com <javascript:>>.
To view this discussion on the web visit

https://groups.google.com/d/msgid/elasticsearch/d044d380-a4b2-4d22-8990-60f318f7601a%40googlegroups.com

<
https://groups.google.com/d/msgid/elasticsearch/d044d380-a4b2-4d22-8990-60f318f7601a%40googlegroups.com?utm_medium=email&utm_source=footer>.

For more options, visit https://groups.google.com/d/optout.

--
Costin

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/63f36dfd-fdf7-46a8-b092-2df293b3d145%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Anyone having an answer for this ? Thanks in advance.

Il giorno mercoledì 1 aprile 2015 17:58:19 UTC+2, michele crudele ha
scritto:

Thanks,

when is the 2.1 release coming?

Another question, which I think is related to this one btw... I was able
to run this piece of code using facets:

val q5 = """
|{
| "query": {
| "match": { "_all": "error"}},
| "facets":{
| "appName": {"terms": {"field": "appName"}},
| "sourceName": {"terms": {"field": "sourceName"}}}
|}
""".stripMargin

println("Query: " + q5)

val rdd = sc.esRDD("logs/app", q5);

What I get from the rdd are tuples (docID, Map[of the field=value]).
Should I also expect to find facets ? If so, how do I get them ?

Il giorno mercoledì 1 aprile 2015 12:02:20 UTC+2, Costin Leau ha scritto:

The short answer is that the connector relies on scan/scroll search for
its core functionality. And with aggs it needs
to switch the way it queries the cluster to a count search.
This is the last major feature that needs to be addressed before the 2.1
release. There's also an issue for it raised
here [1] which you can track.

Cheers,

[1] Enhance aggregation support · Issue #276 · elastic/elasticsearch-hadoop · GitHub

On 4/1/15 12:53 PM, michele crudele wrote:

I have ES, Spark, and ES hadoop adapter installed on my laptop. I wrote
a simple scala notebook to test ES adapter.
Everything was fine until I started thinking at more sophisticated
features. This is the snippet that drives me crazy:

%AddJar
file:///tools/elasticsearch-hadoop-2.1.0.Beta3/dist/elasticsearch-hadoop-2.1.0.BUILD-SNAPSHOT.jar

%AddJar
file:///tools/elasticsearch-hadoop-2.1.0.Beta3/dist/elasticsearch-spark_2.10-2.1.0.BUILD-SNAPSHOT.jar

import org.elasticsearch.spark.rdd._

val q2 = """{
|"query" : { "term": { "appName": "console" } },
|"aggregations": {
| "unusual": {
| "significant_terms": {"field": "pathname"}
| }
|}
|}""".stripMargin

val res = sc.esRDD("logs/app", q2);

println("Matches: " + res.count())

When I run the code I get this exception:

Name: org.apache.spark.SparkException
Message: Job aborted due to stage failure: Task 2 in stage 15.0 failed
1 times, most recent failure: Lost task 2.0 in stage 15.0 (TID 58,
localhost): org.apache.spark.util.TaskCompletionListenerException:
SearchPhaseExecutionException[Failed to execute phase [init_scan], all
shards failed; shardFailures {[N1R-UlgOQCGXCFCtbJ3sBQ][logrecords][2]:
ElasticsearchIllegalArgumentException[aggregations are not supported with
search_type=scan]}]
at
org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76)

    at org.apache.spark.scheduler.Task.run(Task.scala:58) 
    at 

org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)

    at 

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

    at 

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

    at java.lang.Thread.run(Thread.java:745) 

"aggregations are not supported with search_type=scan", which is fine.
The question is: how do I set search_type to the right value (e.g.
count) in the sc.esRDD() call?
I tried several places in the q2 json with no success and I was not
able to find an answer through
the documentation. I would appreciate any help.

However, I see a possible inconsistency with the behaviour of the ES
API used directly via cURL.
The command with the same query above, and without any setting about
search_type works correctly:

curl 'localhost:9200/logs/app/_search?pretty' -d'{"query" : { "term": {
"appName": "console" } },
"aggregations": { "unusual": { "significant_terms": {"field":
"pathname"} }}}'

returns hits:{} and aggregations:{}. Why the Spark integration does not
work the same ?

--
You received this message because you are subscribed to the Google
Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send
an email to
elasticsearc...@googlegroups.com <mailto:
elasticsearch+unsubscribe@googlegroups.com>.
To view this discussion on the web visit

https://groups.google.com/d/msgid/elasticsearch/d044d380-a4b2-4d22-8990-60f318f7601a%40googlegroups.com

<
https://groups.google.com/d/msgid/elasticsearch/d044d380-a4b2-4d22-8990-60f318f7601a%40googlegroups.com?utm_medium=email&utm_source=footer>.

For more options, visit https://groups.google.com/d/optout.

--
Costin

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/78c8bc75-6551-4c90-a7cf-fabefba48596%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Facets are deprecated and will be removed and as such, there is no support or plans to add support for them in the near
future.

As for when aggregations will land in 2.1, the near future - I don't want to give estimates (only to miss them) but
let's just say it's very high priority.

Cheers,

On 4/8/15 11:32 PM, michele crudele wrote:

Anyone having an answer for this ? Thanks in advance.

Il giorno mercoledì 1 aprile 2015 17:58:19 UTC+2, michele crudele ha scritto:

Thanks,

when is the 2.1 release coming?

Another question, which I think is related to this one btw... I was able to run this piece of code using facets:

val q5 = """
|{
|  "query": {
|    "match": { "_all": "error"}},
|  "facets":{
|    "appName": {"terms": {"field": "appName"}},
|    "sourceName": {"terms": {"field": "sourceName"}}}
|}
""".stripMargin


println("Query: " + q5)

val rdd = sc.esRDD("logs/app", q5);

What I get from the rdd are tuples (docID, Map[of the field=value]). Should I also expect to find facets ? If so,
how do I get them ?


Il giorno mercoledì 1 aprile 2015 12:02:20 UTC+2, Costin Leau ha scritto:

    The short answer is that the connector relies on scan/scroll search for its core functionality. And with aggs it
    needs
    to switch the way it queries the cluster to a count search.
    This is the last major feature that needs to be addressed before the 2.1 release. There's also an issue for it
    raised
    here [1] which you can track.

    Cheers,

    [1] https://github.com/elastic/elasticsearch-hadoop/issues/276
    <https://github.com/elastic/elasticsearch-hadoop/issues/276>

    On 4/1/15 12:53 PM, michele crudele wrote:
    >
    > I have ES, Spark, and ES hadoop adapter installed on my laptop. I wrote a simple scala notebook to test ES adapter.
    > Everything was fine until I started thinking at more sophisticated features. This is the snippet that drives me crazy:
    >
    > %AddJar file:///tools/elasticsearch-hadoop-2.1.0.Beta3/dist/elasticsearch-hadoop-2.1.0.BUILD-SNAPSHOT.jar
    > %AddJar file:///tools/elasticsearch-hadoop-2.1.0.Beta3/dist/elasticsearch-spark_2.10-2.1.0.BUILD-SNAPSHOT.jar
    >
    > import org.elasticsearch.spark.rdd._
    >
    > val q2 = """{
    >      |"query" : { "term": { "appName": "console" } },
    >      |"aggregations": {
    >      |  "unusual": {
    >      |    "significant_terms": {"field": "pathname"}
    >      |  }
    >      |}
    > |}""".stripMargin
    >
    > val res = sc.esRDD("logs/app", q2);
    >
    > println("Matches: " + res.count())
    >
    >
    > When I run the code I get this exception:
    >
    > Name: org.apache.spark.SparkException
    > Message: Job aborted due to stage failure: Task 2 in stage 15.0 failed 1 times, most recent failure: Lost task 2.0 in stage 15.0 (TID 58, localhost): org.apache.spark.util.TaskCompletionListenerException: SearchPhaseExecutionException[Failed to execute phase [init_scan], all shards
    failed; shardFailures {[N1R-UlgOQCGXCFCtbJ3sBQ][logrecords][2]:
    ElasticsearchIllegalArgumentException[aggregations are not supported with search_type=scan]}]
    >         at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76)
    >         at org.apache.spark.scheduler.Task.run(Task.scala:58)
    >         at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
    >         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    >         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    >         at java.lang.Thread.run(Thread.java:745)
    >
    >
    > "aggregations are not supported with search_type=scan", which is fine.
    > The question is: how do I set search_type to the right value (e.g. count) in the sc.esRDD() call?
    > I tried several places in the q2 json with no success and I was not able to find an answer through
    > the documentation. I would appreciate any help.
    >
    > However, I see a possible inconsistency with the behaviour of the ES API used directly via cURL.
    > The command with the same query above, and without any setting about search_type works correctly:
    >
    > curl 'localhost:9200/logs/app/_search?pretty' -d'{"query" : { "term": { "appName": "console" } },
    > "aggregations": { "unusual": { "significant_terms": {"field": "pathname"} }}}'
    >
    > returns hits:{} and aggregations:{}. Why the Spark integration does not work the same ?
    >
    > --
    > You received this message because you are subscribed to the Google Groups "elasticsearch" group.
    > To unsubscribe from this group and stop receiving emails from it, send an email to
    >elasticsearc...@googlegroups.com <mailto:elasticsearch+unsubscribe@googlegroups.com>.
    > To view this discussion on the web visit
    >https://groups.google.com/d/msgid/elasticsearch/d044d380-a4b2-4d22-8990-60f318f7601a%40googlegroups.com
    <https://groups.google.com/d/msgid/elasticsearch/d044d380-a4b2-4d22-8990-60f318f7601a%40googlegroups.com>
    > <https://groups.google.com/d/msgid/elasticsearch/d044d380-a4b2-4d22-8990-60f318f7601a%40googlegroups.com?utm_medium=email&utm_source=footer
    <https://groups.google.com/d/msgid/elasticsearch/d044d380-a4b2-4d22-8990-60f318f7601a%40googlegroups.com?utm_medium=email&utm_source=footer>>.

    > For more options, visithttps://groups.google.com/d/optout <https://groups.google.com/d/optout>.

    --
    Costin

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to
elasticsearch+unsubscribe@googlegroups.com mailto:elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit
https://groups.google.com/d/msgid/elasticsearch/78c8bc75-6551-4c90-a7cf-fabefba48596%40googlegroups.com
https://groups.google.com/d/msgid/elasticsearch/78c8bc75-6551-4c90-a7cf-fabefba48596%40googlegroups.com?utm_medium=email&utm_source=footer.
For more options, visit https://groups.google.com/d/optout.

--
Costin

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/55259A39.2020103%40gmail.com.
For more options, visit https://groups.google.com/d/optout.