ES-Hadoop PySpark error


I am currently working on a project where I do some fuzzy matching on data in an elasticsearch index. Because I have millions of data in a python dataframe to match against millions of data in the elasticsearch index, it is taking quite a long while as I am having to go through each record in the dataframe.
So, I thought of using spark's distributed computing power. My aim is this: spark splits the dataframe into different executors and each executor queries the elasicsearch index. This should increase the overall speed. Hence, my work with ES-Hadoop.

I have downloaded the binaries and I ran the following but got the error below. Any idea what I am getting wrong?
./bin/pyspark --driver-class-path=/Users/xx/ES-Hadoop/elasticsearch-hadoop-6.0.0/dist
Welcome to
____ __
/ / ___ / /
\ / _ / _ `/ __/ '/
/ .
_/_,// //_\ version 2.2.0

Using Python version 2.7.13 (default, Dec 18 2016 07:03:39)
SparkSession available as 'spark'.

conf = {"es.nodes":"","es.port":9223,"es.resource":"client_index_multilang"}
rdd = sc.newAPIHadoopRDD("", "", "", conf=conf)
Traceback (most recent call last):
File "", line 1, in
File "/usr/local/Cellar/apache-spark/2.2.0/libexec/python/pyspark/", line 702, in newAPIHadoopRDD
jconf, batchSize)
File "/usr/local/Cellar/apache-spark/2.2.0/libexec/python/lib/", line 1133, in call
File "/usr/local/Cellar/apache-spark/2.2.0/libexec/python/pyspark/sql/", line 63, in deco
return f(*a, **kw)
File "/usr/local/Cellar/apache-spark/2.2.0/libexec/python/lib/", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD.
: java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.String
at org.apache.spark.api.python.PythonHadoopUtil$$anonfun$mapToConf$1.apply(PythonHadoopUtil.scala:160)
at org.apache.spark.api.python.PythonHadoopUtil$$anonfun$mapToConf$1.apply(PythonHadoopUtil.scala:160)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at org.apache.spark.api.python.PythonHadoopUtil$.mapToConf(PythonHadoopUtil.scala:160)
at org.apache.spark.api.python.PythonRDD$.newAPIHadoopRDD(PythonRDD.scala:580)
at org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(
at sun.reflect.DelegatingMethodAccessorImpl.invoke(
at java.lang.reflect.Method.invoke(
at py4j.reflection.MethodInvoker.invoke(
at py4j.reflection.ReflectionEngine.invoke(
at py4j.Gateway.invoke(
at py4j.commands.AbstractCommand.invokeMethod(
at py4j.commands.CallCommand.execute(

Anyone see what I am getting wrong?
spark version: version 2.2.0
Elastic Search version: 2.2.1
No haddop.


Could it be that you are specifying the port number as an Integer instead of a String?


should be this?


This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.