Question about Elasticsearch and Spark

Hi:
I have a very simple application that queries an ES instance and returns
the count of documents found by the query. I am using the Spark interface
as I intend to
do run ML algorithms on the result set. With that said here are the
problems I face :

  1. If I set up the Configuration(to use in the newAPIHadoopRDD) or JobCnf
    (to use with hadoopRDD), using a remote ES instance like so :

This is using the new APIHadoopRDD interface

val sparkConf = new
SparkConf().setMaster("local[2]").setAppname("TestESSpark")
sparkConf.set("spark.serializer",classOf[KyroSerializer].getName)
val sc = new SparkContext(sparkConf)

val conf = new Configuration // change to new JobConf for the old API
conf.set("es.nodes","remote.server:port")
conf.set("es.resources","index/type")
conf.set("es.query","{"query":{"match_all":{}}")
val esRDD =
sc.newAPIHadoopRDD(conf,classOf[EsInputFormat[Text,MapWritable]],classOf[Text],classOf[MapWritable])
// change to hadoopRDD for the old API
val docCount = esRDD.count
println(docCount)

The application just hangs at the println. //((basically executing the
search or so I think).

  1. If I use localhost instead of "remote.server:port" for the es.nodes, the
    application throws an exception :
    Exception in thread "main"
    org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection
    error (check network and/or proxy settings)- all nodes failed; tried
    [[localhost:9200]]
    at
    org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:123)
    at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:303)
    at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:287)
    at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:291)
    at org.elasticsearch.hadoop.rest.RestClient.get(RestClient.java:118)
    at
    org.elasticsearch.hadoop.rest.RestClient.discoverNodes(RestClient.java:100)
    at
    org.elasticsearch.hadoop.rest.InitializationUtils.discoverNodesIfNeeded(InitializationUtils.java:57)
    at
    org.elasticsearch.hadoop.rest.RestService.findPartitions(RestService.java:220)
    at
    org.elasticsearch.hadoop.mr.EsInputFormat.getSplits(EsInputFormat.java:406)
    at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:179)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1135)
    at org.apache.spark.rdd.RDD.count(RDD.scala:904)
    at
    trgr.rd.newsplus.pairgen.ElasticSparkTest1$.main(ElasticSparkTest1.scala:59)
    at trgr.rd.newsplus.pairgen.ElasticSparkTest1.main(ElasticSparkTest1.scala)

I am using the 2.1.0.Beta2 version of the elasticsearch-hadoop library.
and running it against a local instance ES version 1.3.2/remote instance
ES version 1.0.0

Any insight as to what I might be missing/doing wrong ?

Thanks

Ramdev

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/2b42a015-9f39-4a38-963f-f75e7141547a%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

An Update on this :

The Exception mentioned in Item 2 in my original post was due to the ES
instance being down (and for some reason I failed to realise that).
That said, I am still having trouble with problem Item 1. Following
questions came up :

  1. Is there a correlation between the number of shards/replication on the
    ES instance to the number of shard-splits that are crated in the query
    request ? And
  2. if the ES instance is on a single shard and has a fairly large number of
    documents, Would the performance be slower ?
  3. Is there any network latency issues ? (I am able to query the instance
    using the sense/head plugins, and the response time is not bad its
    approximately 28ms)

the reason for question 1. is because of the following :

6738 [main] INFO org.elasticsearch.hadoop.mr.EsInputFormat - Created [2]
shard-splits
6780 [main] INFO org.apache.spark.SparkContext - Starting job: count at
ElasticSparkTest1.scala:59
6801 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.scheduler.DAGScheduler - Got job 0 (count at
ElasticSparkTest1.scala:59) with 2 output partitions (allowLocal=false)
6802 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.scheduler.DAGScheduler - Final stage: Stage 0(count at
ElasticSparkTest1.scala:59)
6802 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.scheduler.DAGScheduler - Parents of final stage: List()
6808 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.scheduler.DAGScheduler - Missing parents: List()
6818 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.scheduler.DAGScheduler - Submitting Stage 0
(NewHadoopRDD[0] at newAPIHadoopRDD at ElasticSparkTest1.scala:57), which
has no missing parents
6853 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.storage.MemoryStore - ensureFreeSpace(1568) called with
curMem=34372, maxMem=503344005
6854 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.storage.MemoryStore - Block broadcast_1 stored as values
in memory (estimated size 1568.0 B, free 480.0 MB)
6870 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.scheduler.DAGScheduler - Submitting 2 missing tasks from
Stage 0 (NewHadoopRDD[0] at newAPIHadoopRDD at ElasticSparkTest1.scala:57)
6872 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.scheduler.TaskSchedulerImpl - Adding task set 0.0 with 2
tasks
6912 [sparkDriver-akka.actor.default-dispatcher-2] INFO
org.apache.spark.scheduler.TaskSetManager - Starting task 0.0 in stage 0.0
(TID 0, localhost, ANY, 18521 bytes)
6917 [sparkDriver-akka.actor.default-dispatcher-2] INFO
org.apache.spark.scheduler.TaskSetManager - Starting task 1.0 in stage 0.0
(TID 1, localhost, ANY, 18521 bytes)
6923 [Executor task launch worker-0] INFO
org.apache.spark.executor.Executor - Running task 0.0 in stage 0.0 (TID 0)
6923 [Executor task launch worker-1] INFO
org.apache.spark.executor.Executor - Running task 1.0 in stage 0.0 (TID 1)
6958 [Executor task launch worker-0] INFO
org.apache.spark.rdd.NewHadoopRDD - Input split: ShardInputSplit
[node=[ZIbTPE4FSxigrYkomftWQw/Strobe|192.189.224.80:9600],shard=1]
6958 [Executor task launch worker-1] INFO
org.apache.spark.rdd.NewHadoopRDD - Input split: ShardInputSplit
[node=[ZIbTPE4FSxigrYkomftWQw/Strobe|192.189.224.80:9600],shard=0]
6998 [Executor task launch worker-0] WARN
org.elasticsearch.hadoop.mr.EsInputFormat - Cannot determine task id...
6998 [Executor task launch worker-1] WARN
org.elasticsearch.hadoop.mr.EsInputFormat - Cannot determine task id...

I noticed only two shard-splits being created.

On the other hand when I run the application on localhost with default
settings, this is what I get :
4960 [main] INFO org.elasticsearch.hadoop.mr.EsInputFormat - Created [5]
shard-splits
5002 [main] INFO org.apache.spark.SparkContext - Starting job: count at
ElasticSparkTest1.scala:59
5022 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.scheduler.DAGScheduler - Got job 0 (count at
ElasticSparkTest1.scala:59) with 5 output partitions (allowLocal=false)
5023 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.scheduler.DAGScheduler - Final stage: Stage 0(count at
ElasticSparkTest1.scala:59)
5023 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.scheduler.DAGScheduler - Parents of final stage: List()
5030 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.scheduler.DAGScheduler - Missing parents: List()
5040 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.scheduler.DAGScheduler - Submitting Stage 0
(NewHadoopRDD[0] at newAPIHadoopRDD at ElasticSparkTest1.scala:57), which
has no missing parents
5075 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.storage.MemoryStore - ensureFreeSpace(1568) called with
curMem=34340, maxMem=511377408
5076 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.storage.MemoryStore - Block broadcast_1 stored as values
in memory (estimated size 1568.0 B, free 487.7 MB)
5092 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.scheduler.DAGScheduler - Submitting 5 missing tasks from
Stage 0 (NewHadoopRDD[0] at newAPIHadoopRDD at ElasticSparkTest1.scala:57)
5094 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.scheduler.TaskSchedulerImpl - Adding task set 0.0 with 5
tasks
5133 [sparkDriver-akka.actor.default-dispatcher-4] INFO
org.apache.spark.scheduler.TaskSetManager - Starting task 0.0 in stage 0.0
(TID 0, localhost, ANY, 16090 bytes)
5138 [sparkDriver-akka.actor.default-dispatcher-4] INFO
org.apache.spark.scheduler.TaskSetManager - Starting task 1.0 in stage 0.0
(TID 1, localhost, ANY, 16090 bytes)
5140 [sparkDriver-akka.actor.default-dispatcher-4] INFO
org.apache.spark.scheduler.TaskSetManager - Starting task 2.0 in stage 0.0
(TID 2, localhost, ANY, 16090 bytes)
5141 [sparkDriver-akka.actor.default-dispatcher-4] INFO
org.apache.spark.scheduler.TaskSetManager - Starting task 3.0 in stage 0.0
(TID 3, localhost, ANY, 16090 bytes)
5142 [sparkDriver-akka.actor.default-dispatcher-4] INFO
org.apache.spark.scheduler.TaskSetManager - Starting task 4.0 in stage 0.0
(TID 4, localhost, ANY, 16090 bytes)
5149 [Executor task launch worker-1] INFO
org.apache.spark.executor.Executor - Running task 1.0 in stage 0.0 (TID 1)
5149 [Executor task launch worker-2] INFO
org.apache.spark.executor.Executor - Running task 2.0 in stage 0.0 (TID 2)
5149 [Executor task launch worker-4] INFO
org.apache.spark.executor.Executor - Running task 4.0 in stage 0.0 (TID 4)
5149 [Executor task launch worker-0] INFO
org.apache.spark.executor.Executor - Running task 0.0 in stage 0.0 (TID 0)
5149 [Executor task launch worker-3] INFO
org.apache.spark.executor.Executor - Running task 3.0 in stage 0.0 (TID 3)
5186 [Executor task launch worker-4] INFO
org.apache.spark.rdd.NewHadoopRDD - Input split: ShardInputSplit
[node=[KsYIAPqDSDafYjZJPh4CeQ/Silver Surfer|10.208.8.28:9200],shard=3]
5186 [Executor task launch worker-2] INFO
org.apache.spark.rdd.NewHadoopRDD - Input split: ShardInputSplit
[node=[KsYIAPqDSDafYjZJPh4CeQ/Silver Surfer|10.208.8.28:9200],shard=2]
5186 [Executor task launch worker-1] INFO
org.apache.spark.rdd.NewHadoopRDD - Input split: ShardInputSplit
[node=[KsYIAPqDSDafYjZJPh4CeQ/Silver Surfer|10.208.8.28:9200],shard=1]
5186 [Executor task launch worker-3] INFO
org.apache.spark.rdd.NewHadoopRDD - Input split: ShardInputSplit
[node=[KsYIAPqDSDafYjZJPh4CeQ/Silver Surfer|10.208.8.28:9200],shard=4]
5186 [Executor task launch worker-0] INFO
org.apache.spark.rdd.NewHadoopRDD - Input split: ShardInputSplit
[node=[KsYIAPqDSDafYjZJPh4CeQ/Silver Surfer|10.208.8.28:9200],shard=0]
5234 [Executor task launch worker-2] WARN
org.elasticsearch.hadoop.mr.EsInputFormat - Cannot determine task id...
5234 [Executor task launch worker-4] WARN
org.elasticsearch.hadoop.mr.EsInputFormat - Cannot determine task id...
5234 [Executor task launch worker-1] WARN
org.elasticsearch.hadoop.mr.EsInputFormat - Cannot determine task id...
5236 [Executor task launch worker-3] WARN
org.elasticsearch.hadoop.mr.EsInputFormat - Cannot determine task id...
5236 [Executor task launch worker-0] WARN
org.elasticsearch.hadoop.mr.EsInputFormat - Cannot determine task id...
7390 [Executor task launch worker-1] INFO
org.apache.spark.executor.Executor - Finished task 1.0 in stage 0.0 (TID
1). 1652 bytes result sent to driver
7390 [Executor task launch worker-0] INFO
org.apache.spark.executor.Executor - Finished task 0.0 in stage 0.0 (TID
0). 1652 bytes result sent to driver
7390 [Executor task launch worker-4] INFO
org.apache.spark.executor.Executor - Finished task 4.0 in stage 0.0 (TID
4). 1652 bytes result sent to driver
7390 [Executor task launch worker-2] INFO
org.apache.spark.executor.Executor - Finished task 2.0 in stage 0.0 (TID
2). 1652 bytes result sent to driver
7390 [Executor task launch worker-3] INFO
org.apache.spark.executor.Executor - Finished task 3.0 in stage 0.0 (TID
3). 1652 bytes result sent to driver
7410 [Result resolver thread-3] INFO
org.apache.spark.scheduler.TaskSetManager - Finished task 0.0 in stage 0.0
(TID 0) in 2276 ms on localhost (1/5)
7417 [Result resolver thread-0] INFO
org.apache.spark.scheduler.TaskSetManager - Finished task 4.0 in stage 0.0
(TID 4) in 2269 ms on localhost (2/5)
7424 [Result resolver thread-1] INFO
org.apache.spark.scheduler.TaskSetManager - Finished task 3.0 in stage 0.0
(TID 3) in 2277 ms on localhost (3/5)
7430 [Result resolver thread-2] INFO
org.apache.spark.scheduler.TaskSetManager - Finished task 2.0 in stage 0.0
(TID 2) in 2285 ms on localhost (4/5)
7437 [Result resolver thread-3] INFO
org.apache.spark.scheduler.TaskSetManager - Finished task 1.0 in stage 0.0
(TID 1) in 2293 ms on localhost (5/5)

Thanks for the help

Ramdev

On Wednesday, 22 October 2014 11:58:02 UTC-5, Ramdev Wudali wrote:

Hi:
I have a very simple application that queries an ES instance and
returns the count of documents found by the query. I am using the Spark
interface as I intend to
do run ML algorithms on the result set. With that said here are the
problems I face :

  1. If I set up the Configuration(to use in the newAPIHadoopRDD) or JobCnf
    (to use with hadoopRDD), using a remote ES instance like so :

This is using the new APIHadoopRDD interface

val sparkConf = new
SparkConf().setMaster("local[2]").setAppname("TestESSpark")
sparkConf.set("spark.serializer",classOf[KyroSerializer].getName)
val sc = new SparkContext(sparkConf)

val conf = new Configuration // change to new JobConf for the old API
conf.set("es.nodes","remote.server:port")
conf.set("es.resources","index/type")
conf.set("es.query","{"query":{"match_all":{}}")
val esRDD =
sc.newAPIHadoopRDD(conf,classOf[EsInputFormat[Text,MapWritable]],classOf[Text],classOf[MapWritable])
// change to hadoopRDD for the old API
val docCount = esRDD.count
println(docCount)

The application just hangs at the println. //((basically executing the
search or so I think).

  1. If I use localhost instead of "remote.server:port" for the es.nodes,
    the application throws an exception :
    Exception in thread "main"
    org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection
    error (check network and/or proxy settings)- all nodes failed; tried
    [[localhost:9200]]
    at
    org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:123)
    at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:303)
    at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:287)
    at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:291)
    at org.elasticsearch.hadoop.rest.RestClient.get(RestClient.java:118)
    at
    org.elasticsearch.hadoop.rest.RestClient.discoverNodes(RestClient.java:100)
    at
    org.elasticsearch.hadoop.rest.InitializationUtils.discoverNodesIfNeeded(InitializationUtils.java:57)
    at
    org.elasticsearch.hadoop.rest.RestService.findPartitions(RestService.java:220)
    at
    org.elasticsearch.hadoop.mr.EsInputFormat.getSplits(EsInputFormat.java:406)
    at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:179)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1135)
    at org.apache.spark.rdd.RDD.count(RDD.scala:904)
    at
    trgr.rd.newsplus.pairgen.ElasticSparkTest1$.main(ElasticSparkTest1.scala:59)
    at trgr.rd.newsplus.pairgen.ElasticSparkTest1.main(ElasticSparkTest1.scala)

I am using the 2.1.0.Beta2 version of the elasticsearch-hadoop library.
and running it against a local instance ES version 1.3.2/remote instance
ES version 1.0.0

Any insight as to what I might be missing/doing wrong ?

Thanks

Ramdev

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/28b68542-77ab-47b2-9d91-45f15b281df0%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Ramdev,

Do your Spark nodes all have access to all of the ES nodes on port 9200?
The connector joins the ES cluster and will query multiple nodes, not just
the configured node in the JobCnf. This bit me on my tests as I was
pointing Spark to a load balancer and hadn't given direct access to the
nodes.

Also, ES 1.0.0 is getting a bit long in the tooth. I'm not sure that's a
tested combination for the connector. I've only personally used the
connector against the 1.3.x train.

David

On Wednesday, October 22, 2014 1:17:57 PM UTC-7, Ramdev Wudali wrote:

An Update on this :

The Exception mentioned in Item 2 in my original post was due to the ES
instance being down (and for some reason I failed to realise that).
That said, I am still having trouble with problem Item 1. Following
questions came up :

  1. Is there a correlation between the number of shards/replication on the
    ES instance to the number of shard-splits that are crated in the query
    request ? And
  2. if the ES instance is on a single shard and has a fairly large number
    of documents, Would the performance be slower ?
  3. Is there any network latency issues ? (I am able to query the instance
    using the sense/head plugins, and the response time is not bad its
    approximately 28ms)

the reason for question 1. is because of the following :

6738 [main] INFO org.elasticsearch.hadoop.mr.EsInputFormat - Created [2]
shard-splits
6780 [main] INFO org.apache.spark.SparkContext - Starting job: count at
ElasticSparkTest1.scala:59
6801 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.scheduler.DAGScheduler - Got job 0 (count at
ElasticSparkTest1.scala:59) with 2 output partitions (allowLocal=false)
6802 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.scheduler.DAGScheduler - Final stage: Stage 0(count at
ElasticSparkTest1.scala:59)
6802 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.scheduler.DAGScheduler - Parents of final stage: List()
6808 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.scheduler.DAGScheduler - Missing parents: List()
6818 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.scheduler.DAGScheduler - Submitting Stage 0
(NewHadoopRDD[0] at newAPIHadoopRDD at ElasticSparkTest1.scala:57), which
has no missing parents
6853 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.storage.MemoryStore - ensureFreeSpace(1568) called with
curMem=34372, maxMem=503344005
6854 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.storage.MemoryStore - Block broadcast_1 stored as values
in memory (estimated size 1568.0 B, free 480.0 MB)
6870 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.scheduler.DAGScheduler - Submitting 2 missing tasks from
Stage 0 (NewHadoopRDD[0] at newAPIHadoopRDD at ElasticSparkTest1.scala:57)
6872 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.scheduler.TaskSchedulerImpl - Adding task set 0.0 with 2
tasks
6912 [sparkDriver-akka.actor.default-dispatcher-2] INFO
org.apache.spark.scheduler.TaskSetManager - Starting task 0.0 in stage 0.0
(TID 0, localhost, ANY, 18521 bytes)
6917 [sparkDriver-akka.actor.default-dispatcher-2] INFO
org.apache.spark.scheduler.TaskSetManager - Starting task 1.0 in stage 0.0
(TID 1, localhost, ANY, 18521 bytes)
6923 [Executor task launch worker-0] INFO
org.apache.spark.executor.Executor - Running task 0.0 in stage 0.0 (TID 0)
6923 [Executor task launch worker-1] INFO
org.apache.spark.executor.Executor - Running task 1.0 in stage 0.0 (TID 1)
6958 [Executor task launch worker-0] INFO
org.apache.spark.rdd.NewHadoopRDD - Input split: ShardInputSplit
[node=[ZIbTPE4FSxigrYkomftWQw/Strobe|192.189.224.80:9600],shard=1]
6958 [Executor task launch worker-1] INFO
org.apache.spark.rdd.NewHadoopRDD - Input split: ShardInputSplit
[node=[ZIbTPE4FSxigrYkomftWQw/Strobe|192.189.224.80:9600],shard=0]
6998 [Executor task launch worker-0] WARN
org.elasticsearch.hadoop.mr.EsInputFormat - Cannot determine task id...
6998 [Executor task launch worker-1] WARN
org.elasticsearch.hadoop.mr.EsInputFormat - Cannot determine task id...

I noticed only two shard-splits being created.

On the other hand when I run the application on localhost with default
settings, this is what I get :
4960 [main] INFO org.elasticsearch.hadoop.mr.EsInputFormat - Created [5]
shard-splits
5002 [main] INFO org.apache.spark.SparkContext - Starting job: count at
ElasticSparkTest1.scala:59
5022 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.scheduler.DAGScheduler - Got job 0 (count at
ElasticSparkTest1.scala:59) with 5 output partitions (allowLocal=false)
5023 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.scheduler.DAGScheduler - Final stage: Stage 0(count at
ElasticSparkTest1.scala:59)
5023 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.scheduler.DAGScheduler - Parents of final stage: List()
5030 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.scheduler.DAGScheduler - Missing parents: List()
5040 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.scheduler.DAGScheduler - Submitting Stage 0
(NewHadoopRDD[0] at newAPIHadoopRDD at ElasticSparkTest1.scala:57), which
has no missing parents
5075 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.storage.MemoryStore - ensureFreeSpace(1568) called with
curMem=34340, maxMem=511377408
5076 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.storage.MemoryStore - Block broadcast_1 stored as values
in memory (estimated size 1568.0 B, free 487.7 MB)
5092 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.scheduler.DAGScheduler - Submitting 5 missing tasks from
Stage 0 (NewHadoopRDD[0] at newAPIHadoopRDD at ElasticSparkTest1.scala:57)
5094 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.scheduler.TaskSchedulerImpl - Adding task set 0.0 with 5
tasks
5133 [sparkDriver-akka.actor.default-dispatcher-4] INFO
org.apache.spark.scheduler.TaskSetManager - Starting task 0.0 in stage 0.0
(TID 0, localhost, ANY, 16090 bytes)
5138 [sparkDriver-akka.actor.default-dispatcher-4] INFO
org.apache.spark.scheduler.TaskSetManager - Starting task 1.0 in stage 0.0
(TID 1, localhost, ANY, 16090 bytes)
5140 [sparkDriver-akka.actor.default-dispatcher-4] INFO
org.apache.spark.scheduler.TaskSetManager - Starting task 2.0 in stage 0.0
(TID 2, localhost, ANY, 16090 bytes)
5141 [sparkDriver-akka.actor.default-dispatcher-4] INFO
org.apache.spark.scheduler.TaskSetManager - Starting task 3.0 in stage 0.0
(TID 3, localhost, ANY, 16090 bytes)
5142 [sparkDriver-akka.actor.default-dispatcher-4] INFO
org.apache.spark.scheduler.TaskSetManager - Starting task 4.0 in stage 0.0
(TID 4, localhost, ANY, 16090 bytes)
5149 [Executor task launch worker-1] INFO
org.apache.spark.executor.Executor - Running task 1.0 in stage 0.0 (TID 1)
5149 [Executor task launch worker-2] INFO
org.apache.spark.executor.Executor - Running task 2.0 in stage 0.0 (TID 2)
5149 [Executor task launch worker-4] INFO
org.apache.spark.executor.Executor - Running task 4.0 in stage 0.0 (TID 4)
5149 [Executor task launch worker-0] INFO
org.apache.spark.executor.Executor - Running task 0.0 in stage 0.0 (TID 0)
5149 [Executor task launch worker-3] INFO
org.apache.spark.executor.Executor - Running task 3.0 in stage 0.0 (TID 3)
5186 [Executor task launch worker-4] INFO
org.apache.spark.rdd.NewHadoopRDD - Input split: ShardInputSplit
[node=[KsYIAPqDSDafYjZJPh4CeQ/Silver Surfer|10.208.8.28:9200],shard=3]
5186 [Executor task launch worker-2] INFO
org.apache.spark.rdd.NewHadoopRDD - Input split: ShardInputSplit
[node=[KsYIAPqDSDafYjZJPh4CeQ/Silver Surfer|10.208.8.28:9200],shard=2]
5186 [Executor task launch worker-1] INFO
org.apache.spark.rdd.NewHadoopRDD - Input split: ShardInputSplit
[node=[KsYIAPqDSDafYjZJPh4CeQ/Silver Surfer|10.208.8.28:9200],shard=1]
5186 [Executor task launch worker-3] INFO
org.apache.spark.rdd.NewHadoopRDD - Input split: ShardInputSplit
[node=[KsYIAPqDSDafYjZJPh4CeQ/Silver Surfer|10.208.8.28:9200],shard=4]
5186 [Executor task launch worker-0] INFO
org.apache.spark.rdd.NewHadoopRDD - Input split: ShardInputSplit
[node=[KsYIAPqDSDafYjZJPh4CeQ/Silver Surfer|10.208.8.28:9200],shard=0]
5234 [Executor task launch worker-2] WARN
org.elasticsearch.hadoop.mr.EsInputFormat - Cannot determine task id...
5234 [Executor task launch worker-4] WARN
org.elasticsearch.hadoop.mr.EsInputFormat - Cannot determine task id...
5234 [Executor task launch worker-1] WARN
org.elasticsearch.hadoop.mr.EsInputFormat - Cannot determine task id...
5236 [Executor task launch worker-3] WARN
org.elasticsearch.hadoop.mr.EsInputFormat - Cannot determine task id...
5236 [Executor task launch worker-0] WARN
org.elasticsearch.hadoop.mr.EsInputFormat - Cannot determine task id...
7390 [Executor task launch worker-1] INFO
org.apache.spark.executor.Executor - Finished task 1.0 in stage 0.0 (TID
1). 1652 bytes result sent to driver
7390 [Executor task launch worker-0] INFO
org.apache.spark.executor.Executor - Finished task 0.0 in stage 0.0 (TID
0). 1652 bytes result sent to driver
7390 [Executor task launch worker-4] INFO
org.apache.spark.executor.Executor - Finished task 4.0 in stage 0.0 (TID
4). 1652 bytes result sent to driver
7390 [Executor task launch worker-2] INFO
org.apache.spark.executor.Executor - Finished task 2.0 in stage 0.0 (TID
2). 1652 bytes result sent to driver
7390 [Executor task launch worker-3] INFO
org.apache.spark.executor.Executor - Finished task 3.0 in stage 0.0 (TID
3). 1652 bytes result sent to driver
7410 [Result resolver thread-3] INFO
org.apache.spark.scheduler.TaskSetManager - Finished task 0.0 in stage 0.0
(TID 0) in 2276 ms on localhost (1/5)
7417 [Result resolver thread-0] INFO
org.apache.spark.scheduler.TaskSetManager - Finished task 4.0 in stage 0.0
(TID 4) in 2269 ms on localhost (2/5)
7424 [Result resolver thread-1] INFO
org.apache.spark.scheduler.TaskSetManager - Finished task 3.0 in stage 0.0
(TID 3) in 2277 ms on localhost (3/5)
7430 [Result resolver thread-2] INFO
org.apache.spark.scheduler.TaskSetManager - Finished task 2.0 in stage 0.0
(TID 2) in 2285 ms on localhost (4/5)
7437 [Result resolver thread-3] INFO
org.apache.spark.scheduler.TaskSetManager - Finished task 1.0 in stage 0.0
(TID 1) in 2293 ms on localhost (5/5)

Thanks for the help

Ramdev

On Wednesday, 22 October 2014 11:58:02 UTC-5, Ramdev Wudali wrote:

Hi:
I have a very simple application that queries an ES instance and
returns the count of documents found by the query. I am using the Spark
interface as I intend to
do run ML algorithms on the result set. With that said here are the
problems I face :

  1. If I set up the Configuration(to use in the newAPIHadoopRDD) or JobCnf
    (to use with hadoopRDD), using a remote ES instance like so :

This is using the new APIHadoopRDD interface

val sparkConf = new
SparkConf().setMaster("local[2]").setAppname("TestESSpark")
sparkConf.set("spark.serializer",classOf[KyroSerializer].getName)
val sc = new SparkContext(sparkConf)

val conf = new Configuration // change to new JobConf for the old API
conf.set("es.nodes","remote.server:port")
conf.set("es.resources","index/type")
conf.set("es.query","{"query":{"match_all":{}}")
val esRDD =
sc.newAPIHadoopRDD(conf,classOf[EsInputFormat[Text,MapWritable]],classOf[Text],classOf[MapWritable])
// change to hadoopRDD for the old API
val docCount = esRDD.count
println(docCount)

The application just hangs at the println. //((basically executing the
search or so I think).

  1. If I use localhost instead of "remote.server:port" for the es.nodes,
    the application throws an exception :
    Exception in thread "main"
    org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection
    error (check network and/or proxy settings)- all nodes failed; tried
    [[localhost:9200]]
    at
    org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:123)
    at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:303)
    at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:287)
    at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:291)
    at org.elasticsearch.hadoop.rest.RestClient.get(RestClient.java:118)
    at
    org.elasticsearch.hadoop.rest.RestClient.discoverNodes(RestClient.java:100)
    at
    org.elasticsearch.hadoop.rest.InitializationUtils.discoverNodesIfNeeded(InitializationUtils.java:57)
    at
    org.elasticsearch.hadoop.rest.RestService.findPartitions(RestService.java:220)
    at
    org.elasticsearch.hadoop.mr.EsInputFormat.getSplits(EsInputFormat.java:406)
    at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:179)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1135)
    at org.apache.spark.rdd.RDD.count(RDD.scala:904)
    at
    trgr.rd.newsplus.pairgen.ElasticSparkTest1$.main(ElasticSparkTest1.scala:59)
    at
    trgr.rd.newsplus.pairgen.ElasticSparkTest1.main(ElasticSparkTest1.scala)

I am using the 2.1.0.Beta2 version of the elasticsearch-hadoop library.
and running it against a local instance ES version 1.3.2/remote instance
ES version 1.0.0

Any insight as to what I might be missing/doing wrong ?

Thanks

Ramdev

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/8130712f-7c96-47a5-a6b3-db3cb33dc33a%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.