An Update on this :
The Exception mentioned in Item 2 in my original post was due to the ES
instance being down (and for some reason I failed to realise that).
That said, I am still having trouble with problem Item 1. Following
questions came up :
- Is there a correlation between the number of shards/replication on the
ES instance to the number of shard-splits that are crated in the query
request ? And
- if the ES instance is on a single shard and has a fairly large number of
documents, Would the performance be slower ?
- Is there any network latency issues ? (I am able to query the instance
using the sense/head plugins, and the response time is not bad its
approximately 28ms)
the reason for question 1. is because of the following :
6738 [main] INFO org.elasticsearch.hadoop.mr.EsInputFormat - Created [2]
shard-splits
6780 [main] INFO org.apache.spark.SparkContext - Starting job: count at
ElasticSparkTest1.scala:59
6801 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.scheduler.DAGScheduler - Got job 0 (count at
ElasticSparkTest1.scala:59) with 2 output partitions (allowLocal=false)
6802 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.scheduler.DAGScheduler - Final stage: Stage 0(count at
ElasticSparkTest1.scala:59)
6802 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.scheduler.DAGScheduler - Parents of final stage: List()
6808 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.scheduler.DAGScheduler - Missing parents: List()
6818 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.scheduler.DAGScheduler - Submitting Stage 0
(NewHadoopRDD[0] at newAPIHadoopRDD at ElasticSparkTest1.scala:57), which
has no missing parents
6853 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.storage.MemoryStore - ensureFreeSpace(1568) called with
curMem=34372, maxMem=503344005
6854 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.storage.MemoryStore - Block broadcast_1 stored as values
in memory (estimated size 1568.0 B, free 480.0 MB)
6870 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.scheduler.DAGScheduler - Submitting 2 missing tasks from
Stage 0 (NewHadoopRDD[0] at newAPIHadoopRDD at ElasticSparkTest1.scala:57)
6872 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.scheduler.TaskSchedulerImpl - Adding task set 0.0 with 2
tasks
6912 [sparkDriver-akka.actor.default-dispatcher-2] INFO
org.apache.spark.scheduler.TaskSetManager - Starting task 0.0 in stage 0.0
(TID 0, localhost, ANY, 18521 bytes)
6917 [sparkDriver-akka.actor.default-dispatcher-2] INFO
org.apache.spark.scheduler.TaskSetManager - Starting task 1.0 in stage 0.0
(TID 1, localhost, ANY, 18521 bytes)
6923 [Executor task launch worker-0] INFO
org.apache.spark.executor.Executor - Running task 0.0 in stage 0.0 (TID 0)
6923 [Executor task launch worker-1] INFO
org.apache.spark.executor.Executor - Running task 1.0 in stage 0.0 (TID 1)
6958 [Executor task launch worker-0] INFO
org.apache.spark.rdd.NewHadoopRDD - Input split: ShardInputSplit
[node=[ZIbTPE4FSxigrYkomftWQw/Strobe|192.189.224.80:9600],shard=1]
6958 [Executor task launch worker-1] INFO
org.apache.spark.rdd.NewHadoopRDD - Input split: ShardInputSplit
[node=[ZIbTPE4FSxigrYkomftWQw/Strobe|192.189.224.80:9600],shard=0]
6998 [Executor task launch worker-0] WARN
org.elasticsearch.hadoop.mr.EsInputFormat - Cannot determine task id...
6998 [Executor task launch worker-1] WARN
org.elasticsearch.hadoop.mr.EsInputFormat - Cannot determine task id...
I noticed only two shard-splits being created.
On the other hand when I run the application on localhost with default
settings, this is what I get :
4960 [main] INFO org.elasticsearch.hadoop.mr.EsInputFormat - Created [5]
shard-splits
5002 [main] INFO org.apache.spark.SparkContext - Starting job: count at
ElasticSparkTest1.scala:59
5022 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.scheduler.DAGScheduler - Got job 0 (count at
ElasticSparkTest1.scala:59) with 5 output partitions (allowLocal=false)
5023 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.scheduler.DAGScheduler - Final stage: Stage 0(count at
ElasticSparkTest1.scala:59)
5023 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.scheduler.DAGScheduler - Parents of final stage: List()
5030 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.scheduler.DAGScheduler - Missing parents: List()
5040 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.scheduler.DAGScheduler - Submitting Stage 0
(NewHadoopRDD[0] at newAPIHadoopRDD at ElasticSparkTest1.scala:57), which
has no missing parents
5075 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.storage.MemoryStore - ensureFreeSpace(1568) called with
curMem=34340, maxMem=511377408
5076 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.storage.MemoryStore - Block broadcast_1 stored as values
in memory (estimated size 1568.0 B, free 487.7 MB)
5092 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.scheduler.DAGScheduler - Submitting 5 missing tasks from
Stage 0 (NewHadoopRDD[0] at newAPIHadoopRDD at ElasticSparkTest1.scala:57)
5094 [sparkDriver-akka.actor.default-dispatcher-5] INFO
org.apache.spark.scheduler.TaskSchedulerImpl - Adding task set 0.0 with 5
tasks
5133 [sparkDriver-akka.actor.default-dispatcher-4] INFO
org.apache.spark.scheduler.TaskSetManager - Starting task 0.0 in stage 0.0
(TID 0, localhost, ANY, 16090 bytes)
5138 [sparkDriver-akka.actor.default-dispatcher-4] INFO
org.apache.spark.scheduler.TaskSetManager - Starting task 1.0 in stage 0.0
(TID 1, localhost, ANY, 16090 bytes)
5140 [sparkDriver-akka.actor.default-dispatcher-4] INFO
org.apache.spark.scheduler.TaskSetManager - Starting task 2.0 in stage 0.0
(TID 2, localhost, ANY, 16090 bytes)
5141 [sparkDriver-akka.actor.default-dispatcher-4] INFO
org.apache.spark.scheduler.TaskSetManager - Starting task 3.0 in stage 0.0
(TID 3, localhost, ANY, 16090 bytes)
5142 [sparkDriver-akka.actor.default-dispatcher-4] INFO
org.apache.spark.scheduler.TaskSetManager - Starting task 4.0 in stage 0.0
(TID 4, localhost, ANY, 16090 bytes)
5149 [Executor task launch worker-1] INFO
org.apache.spark.executor.Executor - Running task 1.0 in stage 0.0 (TID 1)
5149 [Executor task launch worker-2] INFO
org.apache.spark.executor.Executor - Running task 2.0 in stage 0.0 (TID 2)
5149 [Executor task launch worker-4] INFO
org.apache.spark.executor.Executor - Running task 4.0 in stage 0.0 (TID 4)
5149 [Executor task launch worker-0] INFO
org.apache.spark.executor.Executor - Running task 0.0 in stage 0.0 (TID 0)
5149 [Executor task launch worker-3] INFO
org.apache.spark.executor.Executor - Running task 3.0 in stage 0.0 (TID 3)
5186 [Executor task launch worker-4] INFO
org.apache.spark.rdd.NewHadoopRDD - Input split: ShardInputSplit
[node=[KsYIAPqDSDafYjZJPh4CeQ/Silver Surfer|10.208.8.28:9200],shard=3]
5186 [Executor task launch worker-2] INFO
org.apache.spark.rdd.NewHadoopRDD - Input split: ShardInputSplit
[node=[KsYIAPqDSDafYjZJPh4CeQ/Silver Surfer|10.208.8.28:9200],shard=2]
5186 [Executor task launch worker-1] INFO
org.apache.spark.rdd.NewHadoopRDD - Input split: ShardInputSplit
[node=[KsYIAPqDSDafYjZJPh4CeQ/Silver Surfer|10.208.8.28:9200],shard=1]
5186 [Executor task launch worker-3] INFO
org.apache.spark.rdd.NewHadoopRDD - Input split: ShardInputSplit
[node=[KsYIAPqDSDafYjZJPh4CeQ/Silver Surfer|10.208.8.28:9200],shard=4]
5186 [Executor task launch worker-0] INFO
org.apache.spark.rdd.NewHadoopRDD - Input split: ShardInputSplit
[node=[KsYIAPqDSDafYjZJPh4CeQ/Silver Surfer|10.208.8.28:9200],shard=0]
5234 [Executor task launch worker-2] WARN
org.elasticsearch.hadoop.mr.EsInputFormat - Cannot determine task id...
5234 [Executor task launch worker-4] WARN
org.elasticsearch.hadoop.mr.EsInputFormat - Cannot determine task id...
5234 [Executor task launch worker-1] WARN
org.elasticsearch.hadoop.mr.EsInputFormat - Cannot determine task id...
5236 [Executor task launch worker-3] WARN
org.elasticsearch.hadoop.mr.EsInputFormat - Cannot determine task id...
5236 [Executor task launch worker-0] WARN
org.elasticsearch.hadoop.mr.EsInputFormat - Cannot determine task id...
7390 [Executor task launch worker-1] INFO
org.apache.spark.executor.Executor - Finished task 1.0 in stage 0.0 (TID
1). 1652 bytes result sent to driver
7390 [Executor task launch worker-0] INFO
org.apache.spark.executor.Executor - Finished task 0.0 in stage 0.0 (TID
0). 1652 bytes result sent to driver
7390 [Executor task launch worker-4] INFO
org.apache.spark.executor.Executor - Finished task 4.0 in stage 0.0 (TID
4). 1652 bytes result sent to driver
7390 [Executor task launch worker-2] INFO
org.apache.spark.executor.Executor - Finished task 2.0 in stage 0.0 (TID
2). 1652 bytes result sent to driver
7390 [Executor task launch worker-3] INFO
org.apache.spark.executor.Executor - Finished task 3.0 in stage 0.0 (TID
3). 1652 bytes result sent to driver
7410 [Result resolver thread-3] INFO
org.apache.spark.scheduler.TaskSetManager - Finished task 0.0 in stage 0.0
(TID 0) in 2276 ms on localhost (1/5)
7417 [Result resolver thread-0] INFO
org.apache.spark.scheduler.TaskSetManager - Finished task 4.0 in stage 0.0
(TID 4) in 2269 ms on localhost (2/5)
7424 [Result resolver thread-1] INFO
org.apache.spark.scheduler.TaskSetManager - Finished task 3.0 in stage 0.0
(TID 3) in 2277 ms on localhost (3/5)
7430 [Result resolver thread-2] INFO
org.apache.spark.scheduler.TaskSetManager - Finished task 2.0 in stage 0.0
(TID 2) in 2285 ms on localhost (4/5)
7437 [Result resolver thread-3] INFO
org.apache.spark.scheduler.TaskSetManager - Finished task 1.0 in stage 0.0
(TID 1) in 2293 ms on localhost (5/5)
Thanks for the help
Ramdev
On Wednesday, 22 October 2014 11:58:02 UTC-5, Ramdev Wudali wrote:
Hi:
I have a very simple application that queries an ES instance and
returns the count of documents found by the query. I am using the Spark
interface as I intend to
do run ML algorithms on the result set. With that said here are the
problems I face :
- If I set up the Configuration(to use in the newAPIHadoopRDD) or JobCnf
(to use with hadoopRDD), using a remote ES instance like so :
This is using the new APIHadoopRDD interface
val sparkConf = new
SparkConf().setMaster("local[2]").setAppname("TestESSpark")
sparkConf.set("spark.serializer",classOf[KyroSerializer].getName)
val sc = new SparkContext(sparkConf)
val conf = new Configuration // change to new JobConf for the old API
conf.set("es.nodes","remote.server:port")
conf.set("es.resources","index/type")
conf.set("es.query","{"query":{"match_all":{}}")
val esRDD =
sc.newAPIHadoopRDD(conf,classOf[EsInputFormat[Text,MapWritable]],classOf[Text],classOf[MapWritable])
// change to hadoopRDD for the old API
val docCount = esRDD.count
println(docCount)
The application just hangs at the println. //((basically executing the
search or so I think).
- If I use localhost instead of "remote.server:port" for the es.nodes,
the application throws an exception :
Exception in thread "main"
org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection
error (check network and/or proxy settings)- all nodes failed; tried
[[localhost:9200]]
at
org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:123)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:303)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:287)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:291)
at org.elasticsearch.hadoop.rest.RestClient.get(RestClient.java:118)
at
org.elasticsearch.hadoop.rest.RestClient.discoverNodes(RestClient.java:100)
at
org.elasticsearch.hadoop.rest.InitializationUtils.discoverNodesIfNeeded(InitializationUtils.java:57)
at
org.elasticsearch.hadoop.rest.RestService.findPartitions(RestService.java:220)
at
org.elasticsearch.hadoop.mr.EsInputFormat.getSplits(EsInputFormat.java:406)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:179)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1135)
at org.apache.spark.rdd.RDD.count(RDD.scala:904)
at
trgr.rd.newsplus.pairgen.ElasticSparkTest1$.main(ElasticSparkTest1.scala:59)
at trgr.rd.newsplus.pairgen.ElasticSparkTest1.main(ElasticSparkTest1.scala)
I am using the 2.1.0.Beta2 version of the elasticsearch-hadoop library.
and running it against a local instance ES version 1.3.2/remote instance
ES version 1.0.0
Any insight as to what I might be missing/doing wrong ?
Thanks
Ramdev
--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/28b68542-77ab-47b2-9d91-45f15b281df0%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.