Saving DF to Elasticsearch usig python

Hi

I try to desesperately write a dataframe to Elasticsearch...
I tried many differently config but I didn't find the right one.

Reading works fine :

[root@client ~]# cat test1.py
## imports

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext

## constants
APP_NAME="test"

if __name__ == "__main__":

  conf = SparkConf()
  conf.set("es.nodes", "10.120.0.218:9200")

  sc = SparkContext(conf=conf)
  sqlContext = SQLContext(sc)

  df = sqlContext.read\
  .option("es.resource", "test/member")\
  .format("org.elasticsearch.spark.sql")\
  .load()
  df.show()

  sc.stop()

[root@client ~]# /usr/hdp/2.5.3.0-37/spark/bin/spark-submit --master yarn --deploy-mode cluster --jars hdfs:/spark-libs/elasticsearch-hadoop-5.2.1.jar --num-executors 1 --executor-cores 4 --executor-memory 32G --verbose test1.py

and output is :

Log Type: stdout
Log Upload Time: Mon Feb 20 21:46:12 +0000 2017
Log Length: 106
+------+-----------+
|abo_id|   nickname|
+------+-----------+
|   123|supertomato|
+------+-----------+ 

... and trying to rewrite the df in the same index/type :

[root@client ~]# cat test1.py
## imports

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext

## constants
APP_NAME="test"

if __name__ == "__main__":

  conf = SparkConf()
  conf.set("es.nodes", "10.120.0.218:9200")

  sc = SparkContext(conf=conf)
  sqlContext = SQLContext(sc)

  df = sqlContext.read\
  .option("es.resource", "test/member")\
  .format("org.elasticsearch.spark.sql")\
  .load()
  df.show()

  df.write\
    .format("org.elasticsearch.spark.sql")\
    .mode('append')\
    .option("spark.es.resource","test/member")\
    .save()

  sc.stop()

fails with error :

Traceback (most recent call last):
  File "test1.py", line 28, in <module>
    .option("spark.es.resource","test/member")\
  File "/hadoop/yarn/local/usercache/root/appcache/application_1486137522197_0190/container_e05_1486137522197_0190_01_000001/pyspark.zip/pyspark/sql/readwriter.py", line 395, in save
  File "/hadoop/yarn/local/usercache/root/appcache/application_1486137522197_0190/container_e05_1486137522197_0190_01_000001/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__
  File "/hadoop/yarn/local/usercache/root/appcache/application_1486137522197_0190/container_e05_1486137522197_0190_01_000001/pyspark.zip/pyspark/sql/utils.py", line 45, in deco
  File "/hadoop/yarn/local/usercache/root/appcache/application_1486137522197_0190/container_e05_1486137522197_0190_01_000001/py4j-0.9-src.zip/py4j/protocol.py", line 308, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o57.save.
: java.lang.AbstractMethodError: org.elasticsearch.spark.sql.DefaultSource.createRelation(Lorg/apache/spark/sql/SQLContext;Lorg/apache/spark/sql/SaveMode;Lscala/collection/immutable/Map;Lorg/apache/spark/sql/DataFrame;)Lorg/apache/spark/sql/sources/BaseRelation;
	at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:222)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
	at py4j.Gateway.invoke(Gateway.java:259)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:209)
	at java.lang.Thread.run(Thread.java:745)

Any idea ? Thx in advance...

@easyoups This looks familiar to this issue. Could you see if that solves your problem?

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.