Hi Mans,
It works really well for me. Several considerations:
- End the program with
query.awaitTermination();
- Configure checkpoint location.
- Output mode must be append.
- In SparkSession you must configure elasticsearch details.
- In groupBy datasets you must configure watermark.
- The start method on the query should contain the index name and doc type.
- Maybe your logging definition is not set correctly so you cannot see Spark's errors? All the configurations above create exceptions if not set correctly.
- Spark 2.2.0 is required.
Here is my full java code for handling CSV files. I have other program that create those files every 10 seconds and add them to the input dir. I ran it both on my local machine and in spark cluster (had to remove the master declaration from SparkSession to let spark-submit.sh define it).
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.types.*;
import org.elasticsearch.hadoop.cfg.ConfigurationOptions;
import static org.apache.spark.sql.functions.*;
public class SparkJavaElasticStreamTest {
public static void main(String[] args) {
try {
SparkSession spark = SparkSession.builder()
.config(ConfigurationOptions.ES_NET_HTTP_AUTH_USER, "elastic")
.config(ConfigurationOptions.ES_NET_HTTP_AUTH_PASS, "somepass")
.config(ConfigurationOptions.ES_NODES, "127.0.0.1")
.config(ConfigurationOptions.ES_PORT, "9200")
.appName("StreamingElastic")
.master("local[*]")
.getOrCreate();
StructType schema = new StructType(new StructField[]{
new StructField("batch", DataTypes.StringType, false, Metadata.empty())
,new StructField("key", DataTypes.StringType, false, Metadata.empty())
,new StructField("time", DataTypes.TimestampType, false, Metadata.empty())
,new StructField("amount", DataTypes.LongType, false, Metadata.empty())
});
Dataset<Row> file = spark
.readStream()
.schema(schema)
.csv("/tmp/stream1");
Dataset<Row> grouped = file
.withWatermark("time", "1 seconds")
.groupBy(window(col("time"), "5 minutes", "1 minute"))
.agg(last(col("batch")).alias("last_batch")
, count(col("*")).alias("count5")
, sum("amount").alias("sumAmount5")
, avg("amount").alias("avgAmount5")
, stddev("amount").alias("stddevAmount5")
, min("amount").alias("minAmount5")
, max("amount").alias("maxAmount5"));
StreamingQuery query = grouped
.writeStream()
.outputMode("append")
.format("es")
.option("checkpointLocation", "/tmp/stream1_checkpoint")
.start("index1/type1");
query.awaitTermination();
} catch (Exception e) {
e.printStackTrace();
}
}
}