I am trying to test querying elasticsearch using Apache Spark using
elasticsearch-hadoop. I am just trying to do a query to the elasticsearch
server and return the count of results.
Below is my test class using the Java API:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.serializer.KryoSerializer;
import org.elasticsearch.hadoop.mr.EsInputFormat;
import scala.Tuple2;
public class ElasticsearchSparkQuery{
public static int query(String masterUrl, String elasticsearchHostPort)
{
SparkConf sparkConfig = new
SparkConf().setAppName("ESQuery").setMaster(masterUrl);
sparkConfig.set("spark.serializer", KryoSerializer.class.getName());
JavaSparkContext sparkContext = new JavaSparkContext(sparkConfig);
Configuration conf = new Configuration();
conf.setBoolean("mapred.map.tasks.speculative.execution", false);
conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
conf.set("es.nodes", elasticsearchHostPort);
conf.set("es.resource", "media/docs");
conf.set("es.query", "?q=*");
JavaPairRDD<Text, MapWritable> esRDD =
sparkContext.newAPIHadoopRDD(conf, EsInputFormat.class, Text.class,
MapWritable.class);
return (int) esRDD.count();
}
}
When I try to run this I get the following error:
4/07/04 14:58:07 INFO executor.Executor: Running task ID 0
14/07/04 14:58:07 INFO storage.BlockManager: Found block broadcast_0 locally
14/07/04 14:58:07 INFO rdd.NewHadoopRDD: Input split: ShardInputSplit
[node=[5UATWUzmTUuNzhmGxXWy_w/S'byll|10.45.71.152:9200],shard=0]
14/07/04 14:58:07 WARN mr.EsInputFormat: Cannot determine task id...
14/07/04 14:58:07 ERROR executor.Executor: Exception in task ID 0
java.lang.NoSuchFieldError: ALLOW_UNQUOTED_FIELD_NAMES
at
org.elasticsearch.hadoop.serialization.json.JacksonJsonParser.(JacksonJsonParser.java:38)
at
org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:75)
at
org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:267)
at
org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:75)
at
org.elasticsearch.hadoop.mr.EsInputFormat$ShardRecordReader.next(EsInputFormat.java:319)
at
org.elasticsearch.hadoop.mr.EsInputFormat$ShardRecordReader.nextKeyValue(EsInputFormat.java:255)
at
org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:122)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1014)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:847)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:847)
at
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1080)
at
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1080)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
at org.apache.spark.scheduler.Task.run(Task.scala:51)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Has anyone run into this issue with the JacksonJsonParser?
--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/9da5ae25-3e57-4c24-ab45-c62c987ebec0%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.