Spark Structured Streaming - sink in append mode

Hi James,

Great job regarding support for Spark 2.2.0 structured streaming!! I tried it and it works well.

However, I wonder why you limited the sink to work only in APPEND mode. If user provides es.mapping.id column, you can support UPDATE mode easily (actually, with no code change at all). When information for a certain window of time arrives, the sink will write the data to elasticsearch using the ID. When an update arrives for the same ID, elasticsearch will overwrite that document, since the same ID was used.

Adding the ability to work in UPDATE mode will make elasticsearch a great selection for a sink in our project.

BTW, when is it plan to make elasticsearch-hadoop 6.0.0 a GA version?

Many Thanks,
Haim

Thanks for trying out the new structured streaming feature and for your feedback on it! We agree on the UPDATE mode sentiments and have plans to eventually support it in a later release. We wanted to make sure that we were supporting what we felt was the most common use case in the first iteration. Keep an eye open though!

Hi Haim:

I am trying to use ES Sink for Structured streaming but did not find any working example. I tried using the example https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html#spark-sql-streaming in a test application with socket as source and it seems to hang. If you've been able to use it, can you please share an sample/documentation on it's usage ?

Thanks

Hi Mans,

It works really well for me. Several considerations:

  1. End the program with

query.awaitTermination();

  1. Configure checkpoint location.
  2. Output mode must be append.
  3. In SparkSession you must configure elasticsearch details.
  4. In groupBy datasets you must configure watermark.
  5. The start method on the query should contain the index name and doc type.
  6. Maybe your logging definition is not set correctly so you cannot see Spark's errors? All the configurations above create exceptions if not set correctly.
  7. Spark 2.2.0 is required.

Here is my full java code for handling CSV files. I have other program that create those files every 10 seconds and add them to the input dir. I ran it both on my local machine and in spark cluster (had to remove the master declaration from SparkSession to let spark-submit.sh define it).

import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.types.*;
import org.elasticsearch.hadoop.cfg.ConfigurationOptions;
import static org.apache.spark.sql.functions.*;

public class SparkJavaElasticStreamTest {

    public static void main(String[] args) {
        try {
            SparkSession spark = SparkSession.builder()
                    .config(ConfigurationOptions.ES_NET_HTTP_AUTH_USER, "elastic")
                    .config(ConfigurationOptions.ES_NET_HTTP_AUTH_PASS, "somepass")
                    .config(ConfigurationOptions.ES_NODES, "127.0.0.1")
                    .config(ConfigurationOptions.ES_PORT, "9200")
                    .appName("StreamingElastic")
                    .master("local[*]")
                    .getOrCreate();

            StructType schema = new StructType(new StructField[]{
                    new StructField("batch", DataTypes.StringType, false, Metadata.empty())
                    ,new StructField("key", DataTypes.StringType, false, Metadata.empty())
                    ,new StructField("time", DataTypes.TimestampType, false, Metadata.empty())
                    ,new StructField("amount", DataTypes.LongType, false, Metadata.empty())
            });

            Dataset<Row> file = spark
                    .readStream()
                    .schema(schema)
                    .csv("/tmp/stream1");

            Dataset<Row> grouped = file
                    .withWatermark("time", "1 seconds")
                    .groupBy(window(col("time"), "5 minutes", "1 minute"))
                    .agg(last(col("batch")).alias("last_batch")
                            , count(col("*")).alias("count5")
                            , sum("amount").alias("sumAmount5")
                            , avg("amount").alias("avgAmount5")
                            , stddev("amount").alias("stddevAmount5")
                            , min("amount").alias("minAmount5")
                            , max("amount").alias("maxAmount5"));

            StreamingQuery query = grouped
                    .writeStream()
                    .outputMode("append")
                    .format("es")
                    .option("checkpointLocation", "/tmp/stream1_checkpoint")
                    .start("index1/type1");

            query.awaitTermination();

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

Thanks Haim for the sample code. I will try that. Mans

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.