While using Spark DataFrame API to write to elastic search, I noticed that duplicate rows are being written in certain situations. Meaning, at different times the same data of DataFrame does not write duplicate rows into elastic search.
I am able to confirm the duplicate row existence by using...
//Scala code to write a dataframe to elastic search.
def writeResultToES(data: DataFrame, indexName: String, server: String, port: Int): Seq[ReportResultColumnInfo] = {
SearchResultHandler.deleteIndex(server, port, indexName) //Using some REST code to delete index if it exists.
import org.elasticsearch.spark.sql._
data.saveToEs(s"$indexName/result")
}
writeResultToES(someDF, "cba75065-63c3-48f4-83a8-7baeffb87132", "localhost", 9200)
someDF.count() //yields 9609 rows.
//Command line:
curl -XGET 'http://localhost:9200/cba75065-63c3-48f4-83a8-7baeffb87132/_count'
{"count":10080,"_shards":{"total":5,"successful":5,"failed":0}}
If I grab the elastic search result into a Scala collection and say distinct
on the result, I see the row count to be 9609
.
Please advice on what I may be missing. The interesting thing is that, this problem only happens at random (1 in 7 times). Meaning, the exact same DataFrame
formed at another instance of time actually yields 9609
rows.
Thanks,
Muthu