Hello,
I am currently working on a project where I do some fuzzy matching on data in an elasticsearch index. Because I have millions of data in a python dataframe to match against millions of data in the elasticsearch index, it is taking quite a long while as I am having to go through each record in the dataframe.
So, I thought of using spark's distributed computing power. My aim is this: spark splits the dataframe into different executors and each executor queries the elasicsearch index. This should increase the overall speed. Hence, my work with ES-Hadoop.
I have downloaded the binaries and I ran the following but got the error below. Any idea what I am getting wrong?
./bin/pyspark --driver-class-path=/Users/xx/ES-Hadoop/elasticsearch-hadoop-6.0.0/dist
Welcome to
____ __
/ / ___ / /
\ / _ / _ `/ __/ '/
/ / ._/_,// //_\ version 2.2.0
//
Using Python version 2.7.13 (default, Dec 18 2016 07:03:39)
SparkSession available as 'spark'.
conf = {"es.nodes":"http://xxxxx.net","es.port":9223,"es.resource":"client_index_multilang"}
rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat", "org.apache.hadoop.io.NullWritable", "org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=conf)
Traceback (most recent call last):
File "", line 1, in
File "/usr/local/Cellar/apache-spark/2.2.0/libexec/python/pyspark/context.py", line 702, in newAPIHadoopRDD
jconf, batchSize)
File "/usr/local/Cellar/apache-spark/2.2.0/libexec/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in call
File "/usr/local/Cellar/apache-spark/2.2.0/libexec/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/usr/local/Cellar/apache-spark/2.2.0/libexec/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD.
: java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.String
at org.apache.spark.api.python.PythonHadoopUtil$$anonfun$mapToConf$1.apply(PythonHadoopUtil.scala:160)
at org.apache.spark.api.python.PythonHadoopUtil$$anonfun$mapToConf$1.apply(PythonHadoopUtil.scala:160)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at org.apache.spark.api.python.PythonHadoopUtil$.mapToConf(PythonHadoopUtil.scala:160)
at org.apache.spark.api.python.PythonRDD$.newAPIHadoopRDD(PythonRDD.scala:580)
at org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)
Anyone see what I am getting wrong?
spark version: version 2.2.0
Elastic Search version: 2.2.1
No haddop.
Thanks.