I'm trying to create a hello world type example that uses spark to save data into elasticsearch (using the es spark package 2.3.1) and then read it back again.
The example works ok on spark 1.5, but when I try to run it against spark 1.6, I hit the error in the subject line.
My code looks like this:
snowch$ cat export_to_elasticsearch.py
from __future__ import print_function
import sys
from operator import add
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SQLContext
if __name__ == "__main__":
if len(sys.argv) != 6:
print("Usage: export_to_elasticsearch.py <host> <port> <user> <pass> <path>", file=sys.stderr)
exit(-1)
host = sys.argv[1]
port = sys.argv[2]
user = sys.argv[3]
password = sys.argv[4]
path = sys.argv[5]
conf = SparkConf().setAppName("Elasticsearch example")
# see https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html
conf.set("es.nodes",host)
conf.set("es.port",str(port))
conf.set("es.net.http.auth.user",user)
conf.set("es.net.http.auth.pass",password)
conf.set("es.net.ssl","true")
conf.set("es.net.ssl.truststore.location","truststore.jks")
conf.set("es.net.ssl.truststore.pass","mypassword")
conf.set("es.nodes.wan.only","true")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
# create a test document
d = [{'name': 'Alice', 'age': 1}]
df = sqlContext.createDataFrame(d)
# persist it to elasticsearch
df.write.format("org.elasticsearch.spark.sql").save("spark/{0}".format(path))
sc.stop()
and
snowch$ cat import_from_elasticsearch.py
from __future__ import print_function
import sys
from operator import add
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SQLContext
if __name__ == "__main__":
if len(sys.argv) != 6:
print("Usage: export_to_elasticsearch.py <host> <port> <user> <pass> <path>", file=sys.stderr)
exit(-1)
host = sys.argv[1]
port = sys.argv[2]
user = sys.argv[3]
password = sys.argv[4]
path = sys.argv[5]
conf = SparkConf().setAppName("Elasticsearch example")
# see https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html
conf.set("es.nodes",host)
conf.set("es.port",str(port))
conf.set("es.net.http.auth.user",user)
conf.set("es.net.http.auth.pass",password)
conf.set("es.net.ssl","true")
conf.set("es.net.ssl.truststore.location","truststore.jks")
conf.set("es.net.ssl.truststore.pass","mypassword")
conf.set("es.nodes.wan.only","true")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
# read the data from elasticsearch
esdata = sqlContext.read.format("es").load("spark/{0}".format(path))
print(esdata.rdd.take(10))
sc.stop()