Cannot find mapping for spark/test-1463342115226 - one is required before using Spark SQL

I'm trying to create a hello world type example that uses spark to save data into elasticsearch (using the es spark package 2.3.1) and then read it back again.

The example works ok on spark 1.5, but when I try to run it against spark 1.6, I hit the error in the subject line.

My code looks like this:

snowch$ cat export_to_elasticsearch.py

from __future__ import print_function

import sys
from operator import add

from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SQLContext

if __name__ == "__main__":
    if len(sys.argv) != 6:
        print("Usage: export_to_elasticsearch.py <host> <port> <user> <pass> <path>", file=sys.stderr)
        exit(-1)

    host     = sys.argv[1]
    port     = sys.argv[2]
    user     = sys.argv[3]
    password = sys.argv[4]
    path     = sys.argv[5]

    conf = SparkConf().setAppName("Elasticsearch example")

    # see https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html
    conf.set("es.nodes",host)
    conf.set("es.port",str(port))
    conf.set("es.net.http.auth.user",user)
    conf.set("es.net.http.auth.pass",password)
    conf.set("es.net.ssl","true")
    conf.set("es.net.ssl.truststore.location","truststore.jks")
    conf.set("es.net.ssl.truststore.pass","mypassword")
    conf.set("es.nodes.wan.only","true")

    sc = SparkContext(conf=conf)
    sqlContext = SQLContext(sc)

    # create a test document
    d = [{'name': 'Alice', 'age': 1}]
    df = sqlContext.createDataFrame(d)

    # persist it to elasticsearch
    df.write.format("org.elasticsearch.spark.sql").save("spark/{0}".format(path))

    sc.stop()

and

snowch$ cat import_from_elasticsearch.py

from __future__ import print_function

import sys
from operator import add

from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SQLContext

if __name__ == "__main__":
    if len(sys.argv) != 6:
        print("Usage: export_to_elasticsearch.py <host> <port> <user> <pass> <path>", file=sys.stderr)
        exit(-1)

    host       = sys.argv[1]
    port       = sys.argv[2]
    user       = sys.argv[3]
    password   = sys.argv[4]
    path       = sys.argv[5]

    conf = SparkConf().setAppName("Elasticsearch example")

    # see https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html
    conf.set("es.nodes",host)
    conf.set("es.port",str(port))
    conf.set("es.net.http.auth.user",user)
    conf.set("es.net.http.auth.pass",password)
    conf.set("es.net.ssl","true")
    conf.set("es.net.ssl.truststore.location","truststore.jks")
    conf.set("es.net.ssl.truststore.pass","mypassword")
    conf.set("es.nodes.wan.only","true")

    sc = SparkContext(conf=conf)
    sqlContext = SQLContext(sc)

    # read the data from elasticsearch
    esdata = sqlContext.read.format("es").load("spark/{0}".format(path))

    print(esdata.rdd.take(10))

    sc.stop()

Ok, so the issue was because the type name had changed between test and I was trying to read from a type that didn't exist.

The error message threw me because it made me think the type did exist.

Hi Chris,

I am also trying same stuff with elasticsearch-spark_2.11-2.3.3.jar with ES version 2.3.3. and Spark Version 2.0.0 ..My problem is i am able to read data but when i am trying to write back to ES it is throwing some Error:

Can you please tell me about what is the difference between using elasticsearch-spark_2.11-2.3.3.jar and elasticsearch-hadoop-2.3.3.jar?

Here is the full Stack Trace:

py4j.protocol.Py4JJavaError: An error occurred while calling o34.save.
: java.lang.AbstractMethodError: org.elasticsearch.spark.sql.DefaultSource.createRelation(Lorg/apache/spark/sql/SQLContext;Lorg/apache/spark/sql/SaveMode;Lscala/collection/immutable/Map;Lorg/apache/spark/sql/Dataset;)Lorg/apache/spark/sql/sources/BaseRelation;
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:429)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:211)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:211)
at java.lang.Thread.run(Thread.java:745)

Thanks,
Sumit

Since Scala 2.11 is not backwards compatible with Scala 2.10, the spark portion of the library is also released as a standalone jar with versions that are compatible with each Scala version.

@james.baiera Thanks for your quick reply.
So you mean to say can i directly use elasticsearch-spark_2.11-2.3.3.jar this jar for writing in ES indexs.

Thanks
Sumit

@james.baiera Can you please let me know when i am trying to write ES indexes it is throwing error.
in the below conversation i already attached stack-trace.

Thanks,
Sumit

ES Hadoop does not support Spark 2.0 until version 5.0.0-alpha5. You'll need to download that version to interact with Spark 2.0.