Newbie question about Spark and Elasticsearch

am trying to understand how spark and ES work... could someone please help
me answer this question..

val conf = new Configuration()
conf.set("es.resource", "radio/artists")
conf.set("es.query", "?q=me*")
val esRDD = sc.newHadoopRDD(conf, classOf[EsInputFormat[Text,
MapWritable]],
classOf[Text], classOf[MapWritable]))
val docCount = esRDD.count();

When and where is data being transferred from ES? is it all collected on
the Spark master node, then partitioned and sent to the worker nodes? or is
each worker node talking to ES to somehow get a partition of the data?

How does this effectively work?

Thanks a lot,
Mohamed.

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/CAEU_gmf9Nt0xn_0NbzDn_moRWUT96uWYf4cicJdZik3r0Zz8XA%40mail.gmail.com.
For more options, visit https://groups.google.com/d/optout.

Hi,

First off I recommend using the native integration (aka the Java/Scala APIs) instead of MapReduce. The latter works but
the former is better performing and more flexible.

ES works in a similar fashion to the HDFS store - the data doesn't go through the master rather, each task has its own
partition on works on its own set of data. Behind the scenes we map each worker to an index shard (if there aren't
enough workers, then some will work across multiple shards).

On 12/8/14 4:59 PM, Mohamed Lrhazi wrote:

am trying to understand how spark and ES work... could someone please help me answer this question..

val conf = new Configuration()
conf.set("es.resource", "radio/artists")
conf.set("es.query", "?q=me*")
val esRDD = sc.newHadoopRDD(conf, classOf[EsInputFormat[Text, MapWritable]],
classOf[Text], classOf[MapWritable]))
val docCount = esRDD.count();

When and where is data being transferred from ES? is it all collected on the Spark master node, then partitioned and
sent to the worker nodes? or is each worker node talking to ES to somehow get a partition of the data?

How does this effectively work?

Thanks a lot,
Mohamed.

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to
elasticsearch+unsubscribe@googlegroups.com mailto:elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit
https://groups.google.com/d/msgid/elasticsearch/CAEU_gmf9Nt0xn_0NbzDn_moRWUT96uWYf4cicJdZik3r0Zz8XA%40mail.gmail.com
https://groups.google.com/d/msgid/elasticsearch/CAEU_gmf9Nt0xn_0NbzDn_moRWUT96uWYf4cicJdZik3r0Zz8XA%40mail.gmail.com?utm_medium=email&utm_source=footer.
For more options, visit https://groups.google.com/d/optout.

--
Costin

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/5485C164.7090405%40gmail.com.
For more options, visit https://groups.google.com/d/optout.

Great Thanks a lot Costin.

Are people supposed to deploy the Spark workers on the same ES cluster? I
guess it would make sense for data to remain local and avoid network
transfers altogether?

Thanks a lot,
Mohamed.

On Monday, December 8, 2014 10:19:12 AM UTC-5, Costin Leau wrote:

Hi,

First off I recommend using the native integration (aka the Java/Scala
APIs) instead of MapReduce. The latter works but
the former is better performing and more flexible.

ES works in a similar fashion to the HDFS store - the data doesn't go
through the master rather, each task has its own
partition on works on its own set of data. Behind the scenes we map each
worker to an index shard (if there aren't
enough workers, then some will work across multiple shards).

On 12/8/14 4:59 PM, Mohamed Lrhazi wrote:

am trying to understand how spark and ES work... could someone please
help me answer this question..

val conf = new Configuration()
conf.set("es.resource", "radio/artists")
conf.set("es.query", "?q=me*")
val esRDD = sc.newHadoopRDD(conf, classOf[EsInputFormat[Text,
MapWritable]],
classOf[Text], classOf[MapWritable]))
val docCount = esRDD.count();

When and where is data being transferred from ES? is it all collected on
the Spark master node, then partitioned and
sent to the worker nodes? or is each worker node talking to ES to
somehow get a partition of the data?

How does this effectively work?

Thanks a lot,
Mohamed.

--
You received this message because you are subscribed to the Google
Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send
an email to
elasticsearc...@googlegroups.com <javascript:> <mailto:
elasticsearch+unsubscribe@googlegroups.com <javascript:>>.
To view this discussion on the web visit

https://groups.google.com/d/msgid/elasticsearch/CAEU_gmf9Nt0xn_0NbzDn_moRWUT96uWYf4cicJdZik3r0Zz8XA%40mail.gmail.com

<
https://groups.google.com/d/msgid/elasticsearch/CAEU_gmf9Nt0xn_0NbzDn_moRWUT96uWYf4cicJdZik3r0Zz8XA%40mail.gmail.com?utm_medium=email&utm_source=footer>.

For more options, visit https://groups.google.com/d/optout.

--
Costin

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/26361977-b5e1-45fa-b305-e59310e2ce3f%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Hi,

You recommend the native integration instead of MR and I see on the
official documentation that MR is recommended to read/write data to ES
using spark. Spark support Doc
http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/2.1.Beta/spark.html

what would be the basic piece of code to read data from ES without using MR
?

I'm currently struggling with EsInputFormat[org.apache.hadoop.io.Text,
MapWritable] structure.

my code is :

val sc = new SparkContext(...)

val configuration = new Configuration()
configuration.set("es.nodes", "xxxxxx")
configuration.set("es.port", "9200")
configuration.set("es.resource", resource) // my index/type
configuration.set("es.query", query) //basicaly a match_all

val esRDD = sc.newAPIHadoopRDD(configuration,
classOf[EsInputFormat[org.apache.hadoop.io.Text,
MapWritable]],classOf[org.apache.hadoop.io.Text], classOf[MapWritable])

assume my data is mapped as follow :

{
"oceannetworks": {
"mappings": {
"transcript": {
"properties": {
"cruiseID": {
"type": "string"
},
"diveID": {
"type": "string"
},
"filename_root": {
"type": "string"
},
"id": {
"type": "string"
},
"result": {
"type": "nested",
"properties": {
"begin_time": {
"type": "double"
},
"confidence": {
"type": "double"
},
"end_time": {
"type": "double"
},
"location": {
"type": "geo_point"
},
"word": {
"type": "string"
}
}
},
"status": {
"type": "string"
},
"uuid": {
"type": "string"
},
"version": {
"type": "string"
}
}
}
}
}
}

I'm able to retrieve 1st level information like diveID , cruiseID ... but
it's not clear how to get the 2nd lvl collection "result". It seams I get a
WritableArrayWritable but I'm not sure how to handle it.

I get 1st lvl data with these king of code :

val uuids = esRDD.map(_._2.get(new
org.apache.hadoop.io.Text("uuid")).toString).take(10)

I could use a little bit of help :slight_smile:

thanks.

chris

Le lundi 8 décembre 2014 10:19:12 UTC-5, Costin Leau a écrit :

Hi,

First off I recommend using the native integration (aka the Java/Scala
APIs) instead of MapReduce. The latter works but
the former is better performing and more flexible.

ES works in a similar fashion to the HDFS store - the data doesn't go
through the master rather, each task has its own
partition on works on its own set of data. Behind the scenes we map each
worker to an index shard (if there aren't
enough workers, then some will work across multiple shards).

On 12/8/14 4:59 PM, Mohamed Lrhazi wrote:

am trying to understand how spark and ES work... could someone please
help me answer this question..

val conf = new Configuration()
conf.set("es.resource", "radio/artists")
conf.set("es.query", "?q=me*")
val esRDD = sc.newHadoopRDD(conf, classOf[EsInputFormat[Text,
MapWritable]],
classOf[Text], classOf[MapWritable]))
val docCount = esRDD.count();

When and where is data being transferred from ES? is it all collected on
the Spark master node, then partitioned and
sent to the worker nodes? or is each worker node talking to ES to
somehow get a partition of the data?

How does this effectively work?

Thanks a lot,
Mohamed.

--
You received this message because you are subscribed to the Google
Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send
an email to
elasticsearc...@googlegroups.com <javascript:> <mailto:
elasticsearch+unsubscribe@googlegroups.com <javascript:>>.
To view this discussion on the web visit

https://groups.google.com/d/msgid/elasticsearch/CAEU_gmf9Nt0xn_0NbzDn_moRWUT96uWYf4cicJdZik3r0Zz8XA%40mail.gmail.com

<
https://groups.google.com/d/msgid/elasticsearch/CAEU_gmf9Nt0xn_0NbzDn_moRWUT96uWYf4cicJdZik3r0Zz8XA%40mail.gmail.com?utm_medium=email&utm_source=footer>.

For more options, visit https://groups.google.com/d/optout.

--
Costin

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/959a9e83-5cae-4428-a45d-3ae5266af275%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/2.1.Beta/spark.html#spark-native

On 12/18/14 8:27 PM, chris wrote:

Hi,

You recommend the native integration instead of MR and I see on the official documentation that MR is recommended to
read/write data to ES using spark. Spark support Doc
http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/2.1.Beta/spark.html

what would be the basic piece of code to read data from ES without using MR ?

I'm currently struggling with EsInputFormat[org.apache.hadoop.io.Text, MapWritable] structure.

my code is :

val sc = new SparkContext(...)

val configuration = new Configuration()
configuration.set("es.nodes", "xxxxxx")
configuration.set("es.port", "9200")
configuration.set("es.resource", resource) // my index/type
configuration.set("es.query", query) //basicaly a match_all

val esRDD = sc.newAPIHadoopRDD(configuration, classOf[EsInputFormat[org.apache.hadoop.io.Text,
MapWritable]],classOf[org.apache.hadoop.io.Text], classOf[MapWritable])

assume my data is mapped as follow :

{
"oceannetworks": {
"mappings": {
"transcript": {
"properties": {
"cruiseID": {
"type": "string"
},
"diveID": {
"type": "string"
},
"filename_root": {
"type": "string"
},
"id": {
"type": "string"
},
"result": {
"type": "nested",
"properties": {
"begin_time": {
"type": "double"
},
"confidence": {
"type": "double"
},
"end_time": {
"type": "double"
},
"location": {
"type": "geo_point"
},
"word": {
"type": "string"
}
}
},
"status": {
"type": "string"
},
"uuid": {
"type": "string"
},
"version": {
"type": "string"
}
}
}
}
}
}

I'm able to retrieve 1st level information like diveID , cruiseID ... but it's not clear how to get the 2nd lvl
collection "result". It seams I get a WritableArrayWritable but I'm not sure how to handle it.

I get 1st lvl data with these king of code :

val uuids = esRDD.map(_._2.get(new org.apache.hadoop.io.Text("uuid")).toString).take(10)

I could use a little bit of help :slight_smile:

thanks.

chris

Le lundi 8 décembre 2014 10:19:12 UTC-5, Costin Leau a écrit :

Hi,

First off I recommend using the native integration (aka the Java/Scala APIs) instead of MapReduce. The latter works but
the former is better performing and more flexible.

ES works in a similar fashion to the HDFS store - the data doesn't go through the master rather, each task has its own
partition on works on its own set of data. Behind the scenes we map each worker to an index shard (if there aren't
enough workers, then some will work across multiple shards).


On 12/8/14 4:59 PM, Mohamed Lrhazi wrote:
> am trying to understand how spark and ES work... could someone please help me answer this question..
>
> val conf = new Configuration()
> conf.set("es.resource", "radio/artists")
> conf.set("es.query", "?q=me*")
> val esRDD = sc.newHadoopRDD(conf, classOf[EsInputFormat[Text, MapWritable]],
>                                    classOf[Text], classOf[MapWritable]))
> val docCount = esRDD.count();
>
>
> When and where is data being transferred from ES? is it all collected on the Spark master node, then partitioned and
> sent to the worker nodes? or is each worker node talking to ES to somehow get a partition of the data?
>
> How does this effectively work?
>
> Thanks a lot,
> Mohamed.
>
> --
> You received this message because you are subscribed to the Google Groups "elasticsearch" group.
> To unsubscribe from this group and stop receiving emails from it, send an email to
>elasticsearc...@googlegroups.com <javascript:> <mailto:elasticsearch+unsubscribe@googlegroups.com <javascript:>>.
> To view this discussion on the web visit
>https://groups.google.com/d/msgid/elasticsearch/CAEU_gmf9Nt0xn_0NbzDn_moRWUT96uWYf4cicJdZik3r0Zz8XA%40mail.gmail.com
<https://groups.google.com/d/msgid/elasticsearch/CAEU_gmf9Nt0xn_0NbzDn_moRWUT96uWYf4cicJdZik3r0Zz8XA%40mail.gmail.com>
> <https://groups.google.com/d/msgid/elasticsearch/CAEU_gmf9Nt0xn_0NbzDn_moRWUT96uWYf4cicJdZik3r0Zz8XA%40mail.gmail.com?utm_medium=email&utm_source=footer
<https://groups.google.com/d/msgid/elasticsearch/CAEU_gmf9Nt0xn_0NbzDn_moRWUT96uWYf4cicJdZik3r0Zz8XA%40mail.gmail.com?utm_medium=email&utm_source=footer>>.

> For more options, visithttps://groups.google.com/d/optout <https://groups.google.com/d/optout>.

--
Costin

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to
elasticsearch+unsubscribe@googlegroups.com mailto:elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit
https://groups.google.com/d/msgid/elasticsearch/959a9e83-5cae-4428-a45d-3ae5266af275%40googlegroups.com
https://groups.google.com/d/msgid/elasticsearch/959a9e83-5cae-4428-a45d-3ae5266af275%40googlegroups.com?utm_medium=email&utm_source=footer.
For more options, visit https://groups.google.com/d/optout.

--
Costin

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/54933892.9040703%40gmail.com.
For more options, visit https://groups.google.com/d/optout.