Duplicate rows

While using Spark DataFrame API to write to elastic search, I noticed that duplicate rows are being written in certain situations. Meaning, at different times the same data of DataFrame does not write duplicate rows into elastic search.
I am able to confirm the duplicate row existence by using...

//Scala code to write a dataframe to elastic search.
def writeResultToES(data: DataFrame, indexName: String, server: String, port: Int): Seq[ReportResultColumnInfo] = {
    SearchResultHandler.deleteIndex(server, port, indexName) //Using some REST code to delete index if it exists.
    import org.elasticsearch.spark.sql._
    data.saveToEs(s"$indexName/result")
}    
writeResultToES(someDF, "cba75065-63c3-48f4-83a8-7baeffb87132",  "localhost", 9200)
someDF.count() //yields 9609 rows.

//Command line:
curl -XGET 'http://localhost:9200/cba75065-63c3-48f4-83a8-7baeffb87132/_count'
{"count":10080,"_shards":{"total":5,"successful":5,"failed":0}}

If I grab the elastic search result into a Scala collection and say distinct on the result, I see the row count to be 9609.
Please advice on what I may be missing. The interesting thing is that, this problem only happens at random (1 in 7 times). Meaning, the exact same DataFrame formed at another instance of time actually yields 9609 rows.

Thanks,
Muthu

Hi, can you try to set the write operation to upsert and the mapping id to the primary key of your data. This way, if the record already exists in the index, it is not created but updated.

If you are using Spark,

val conf = new SparkConf().setAppName("myAppName")
conf.set("es.write.operation", "upsert")
conf.set("es.mapping.id", "your-primary-key-field-in-data-frame")

Hope it helps ...

Hi Rulanitee,

Thanks for the tip. Let me try it out. Would I be able to set upsert and es.mapping.id on every saveToEs() call instead? Reason being, the spark session that I have is long running and would deal with heterogeneous data.
But the duplicate rows showing up at random... Does this not sound like a bug?

Thanks,
Muthu

Hi, i think you can as follows:

saveToEs(RDD,RESOURCE, Map("es.mapping.id"->"your_primary_field", "es.write.operation" -> "upsert"))

Hope it helps ...

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.