Spark structured streaming Elasticsearch integration issue. Data source es does not support streamed writing

I am writing a Spark structured streaming application in which data processed with Spark needs be sink'ed to elastic search.

This is my development environment, hence I have a standalone Elastic search.

I have tried following two ways to sink the data in the DataSet to ES.

1.ds.writeStream().format("org.elasticsearch.spark.sql").start("spark/orders"); 2.ds.writeStream().format("es").start("spark/orders");

In both cases I am getting the following error:

Caused by:

java.lang.UnsupportedOperationException: Data source es does not support streamed writing
at org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:287) ~[spark-sql_2.11-2.1.1.jar:2.1.1]
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:272) ~[spark-sql_2.11-2.1.1.jar:2.1.1]
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:213) ~[spark-sql_2.11-2.1.1.jar:2.1.1]
pom.xml:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.1.1</version>
    <scope>provided</scope>

</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.11</artifactId>
    <version>2.1.1</version>
    <scope>provided</scope>

</dependency>
<dependency>
    <groupId>org.mongodb.spark</groupId>
    <artifactId>mongo-spark-connector_2.11</artifactId>
    <version>2.0.0</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.1.1</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-hive_2.11</artifactId>
    <version>2.1.1</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka_2.11</artifactId>
    <version>1.6.2</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
    <version>2.1.1</version>
</dependency>

<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch-spark-20_2.11</artifactId>
    <version>5.6.1</version>
</dependency>

Appreciate any help in this regard.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.