Does Elastic Search Spark 6.1.2 connector has Spark streaming sink?


(Kant Kodali) #1

Does Elastic Search Spark 6.1.2 connector has Spark streaming sink?

I am using compile group: 'org.elasticsearch', name: 'elasticsearch-hadoop', version: '6.1.2'

For example I tried the following and both didn't work.

Example #1

              df
                .writeStream()
                .outputMode("update")
                .trigger(Trigger.ProcessingTime(1000))
                .format("es")
                .option("checkpointLocation", "/tmp")
                .start("test/hello");

java.lang.ClassNotFoundException: Failed to find data source: es. Please find packages at http://spark.apache.org/third-party-projects.html

Example #2

              df
                .writeStream()
                .outputMode("update")
                .trigger(Trigger.ProcessingTime(1000))
                .format("org.elasticsearch.spark.sql.streaming.EsSparkSqlStreamingSink")
                .option("checkpointLocation", "/tmp")
                .start("test/hello");

java.lang.InstantiationException: org.elasticsearch.spark.sql.streaming.EsSparkSqlStreamingSink
Caused by: java.lang.NoSuchMethodException: org.elasticsearch.spark.sql.streaming.EsSparkSqlStreamingSink.<init>()

(Kant Kodali) #2

Did some trial and error and Looks like the below fixes the problem.

     df
        .writeStream()
        .outputMode("append")
        .trigger(Trigger.ProcessingTime(1000))
        .format("org.elasticsearch.spark.sql")
        .option("checkpointLocation", "/tmp")
        .start("test/hello");

#3

I can confirm that you have to specify the streaming class as “org.elasticsearch.spark.sql”.


(system) #4

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.