Hello,
I have a Spark code which loads data from multiple indices and writes back the enriched data into the origin index. Hence, I use the pattern '{index}' to specify the target index dynamically. However, Spark tries to write data even in indices which were not loaded initially. To simplify the comprehension of my problem, I load just one document from the index 'sales' and I see that Spark writes back data into the index 'sales' and another closed index available in ElasticSearch which is called sales-00001.
ElasticSearch version: 7.6.2 ;
Spark version: 2.2.2;
Scala version: 2.11.11.
My code:
val spark = SparkSession
.builder()
.appName("Mrege-Articles")
.master("local[*]")
.config("spark.es.nodes","host:9243")
.config("spark.es.port","443")
.config("spark.es.nodes.wan.only","true")
.config("spark.es.nodes.discovery","false")
.config("spark.es.index.auto.create","true")
. config("es.net.ssl", "true")
.config("es.nodes.resolve.hostname", "false")
.config("es.mapping.date.rich", "false")
.config("es.read.field.as.array.include","tags,articleMerged,customerMerged")
.config("es.write.operation", "update")
.config("es.read.metadata", "true")
.getOrCreate()
val sc = spark.sparkContext
val sql = new SQLContext(sc)
val articles = sql.esDF("articles","?q=product_ID:178799").limit(1)
val sales = sql.esDF("sales","?q=product_ID:178799").limit(1)
articles.createOrReplaceTempView("articles")
sales.createOrReplaceTempView("sales")
val sqlStr =
s"""
|SELECT
|
| *, sales._metadata._id as saleId, sales._metadata._index as index
|
| FROM articles JOIN sales
| ON sales.product_ID == articles.product_ID
""".stripMargin
val cfg = Map(
("es.resource", "{index}"),
("es.mapping.id", "saleId"),
("es.mapping.exclude" -> "saleId,index")
)
flaggedMergedArticles.saveToEs(cfg);
Regards,
Hamza