How to write to ES from a pyspark dataframe?


(NCM) #1

I am not having any luck figuring out this. On the docs there is code to read from ES not to write to ES. Can someone provide some documentation or examples?


(eliasah) #2

Which version of Spark are you using?


(NCM) #3

Thanks for the reply. I am using the spark-1.5.1-bin-hadoop2.6.tgz version downloaded from the spark site. The setup seems to work fine, I am just not sure how the code to write to ES would look like.


(NCM) #4

What I found to write to ES from pyspark is to use saveAsNewAPIHadoopFile as showed below.

However this does not work for me with a dataframe.

> type(query)
pyspark.sql.dataframe.DataFrame
> conf = {"es.resource.write" : "logs/messages",
        "es.nodes"    : "elasticsearch.domain.com:9200"
       }
> query.rdd.saveAsNewAPIHadoopFile(
    path='-', 
    outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
    keyClass="org.apache.hadoop.io.NullWritable", 
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
    conf=conf)

I get this error:

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 10.0 failed 4 times, most recent failure: Lost task 0.3 in stage 10.0 (TID 254, 10.244.10.114): net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for pyspark.sql.types._create_row)
...

(Costin Leau) #5

I'm afraid my pyspark experience is not vast enough to answer this question. It seems to be related to a class in Spark that cannot be properly created but I'm unclear way. Maybe the Spark list can provide more information...


(system) #6