Spark Structured Streaming - sink in append mode

Hi Mans,

It works really well for me. Several considerations:

  1. End the program with

query.awaitTermination();

  1. Configure checkpoint location.
  2. Output mode must be append.
  3. In SparkSession you must configure elasticsearch details.
  4. In groupBy datasets you must configure watermark.
  5. The start method on the query should contain the index name and doc type.
  6. Maybe your logging definition is not set correctly so you cannot see Spark's errors? All the configurations above create exceptions if not set correctly.
  7. Spark 2.2.0 is required.

Here is my full java code for handling CSV files. I have other program that create those files every 10 seconds and add them to the input dir. I ran it both on my local machine and in spark cluster (had to remove the master declaration from SparkSession to let spark-submit.sh define it).

import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.types.*;
import org.elasticsearch.hadoop.cfg.ConfigurationOptions;
import static org.apache.spark.sql.functions.*;

public class SparkJavaElasticStreamTest {

    public static void main(String[] args) {
        try {
            SparkSession spark = SparkSession.builder()
                    .config(ConfigurationOptions.ES_NET_HTTP_AUTH_USER, "elastic")
                    .config(ConfigurationOptions.ES_NET_HTTP_AUTH_PASS, "somepass")
                    .config(ConfigurationOptions.ES_NODES, "127.0.0.1")
                    .config(ConfigurationOptions.ES_PORT, "9200")
                    .appName("StreamingElastic")
                    .master("local[*]")
                    .getOrCreate();

            StructType schema = new StructType(new StructField[]{
                    new StructField("batch", DataTypes.StringType, false, Metadata.empty())
                    ,new StructField("key", DataTypes.StringType, false, Metadata.empty())
                    ,new StructField("time", DataTypes.TimestampType, false, Metadata.empty())
                    ,new StructField("amount", DataTypes.LongType, false, Metadata.empty())
            });

            Dataset<Row> file = spark
                    .readStream()
                    .schema(schema)
                    .csv("/tmp/stream1");

            Dataset<Row> grouped = file
                    .withWatermark("time", "1 seconds")
                    .groupBy(window(col("time"), "5 minutes", "1 minute"))
                    .agg(last(col("batch")).alias("last_batch")
                            , count(col("*")).alias("count5")
                            , sum("amount").alias("sumAmount5")
                            , avg("amount").alias("avgAmount5")
                            , stddev("amount").alias("stddevAmount5")
                            , min("amount").alias("minAmount5")
                            , max("amount").alias("maxAmount5"));

            StreamingQuery query = grouped
                    .writeStream()
                    .outputMode("append")
                    .format("es")
                    .option("checkpointLocation", "/tmp/stream1_checkpoint")
                    .start("index1/type1");

            query.awaitTermination();

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}