Inconsistent Behaviors between Different Runs of ES-Spark Connector

Hello Community,

We are doing ES indexing using the ES-SPark Connector (6.6.1) in our daily spark job.

Here is our API call we make in our spark code

df.write
.format("org.elasticsearch.spark.sql")
.option("es.resource", "indexA_staging/indexA_main")
.option("es.write.operation", "upsert")
.option("es.mapping.id","conformedid")
.option("es.batch.write.refresh","false")
.option("es.index.auto.create","no")
.option("es.nodes", "10.7.17.250:9200")
.option("es.batch.size.bytes", "2mb")
.option("es.batch.size.entries", "2000")
.option("es.write.operation", "upsert")
.mode("append")
.save("indexA_staging/indexA_main")

After the first run of our daily job on each day, we sometimes find that a few records in ES are partially indexed. When we detect this kind of issue, we rerun the spark daily job and find they can be fully indexed then. Here is an example we get today:

After the first run of our daily spark job today, we have 85 records out of 12 million with the following fields partially indexed. But there are 234 fields in total

{
"_index": "indexA",
"_type": "indexA_main",
"_id": "indexa|6a90e8b8-8c8f-4842-b3a3-8d3da67eaca9",
"_version": 1,
"found": true,
"_source": {
"conformedid": "indexa|6a90e8b8-8c8f-4842-b3a3-8d3da67eaca9",
"secondid": "DAD506F4-9563-42CD-BCC2-BB8E74B96A45",
"updated": "2018-05-29T04:59:38.660Z",
"soid": 7057358,
"creationdate": "2018-05-29T11:33:04.730Z",
"path": "dummy text",
"labels": "dummy text",
"isupdated": false,
"status": "dummy text",
"response": false,
"pubilish date": "dummy text"
}
}

After the second run of our same daily job today, we have all records (about 12 million) with the following fields fully indexed there are 234 fields in total
{
"_index": "indexA",
"_type": "indexA_main",
"_id": "indexa|6a90e8b8-8c8f-4842-b3a3-8d3da67eaca9",
"_version": 1,
"found": true,
"_source": {
"conformedid": "indexa|6a90e8b8-8c8f-4842-b3a3-8d3da67eaca9",
"secondid": "DAD506F4-9563-42CD-BCC2-BB8E74B96A45",
"updated": "2018-05-29T04:59:38.660Z",
"soid": 7057358,
"creationdate": "2018-05-29T11:33:04.730Z",
"path": "dummy text",
"labels": "dummy text",
"isupdated": false,
"status": "dummy text",
"response": false,
"pubilish date": "dummy text",
"fieldA": "dummy filedA value",
"filedB ": "dummy filedB value",
"filedC ": "dummy filedC value",
"filedD ": "dummy filedD value",
"filedE ": "dummy filedE value",
"filedF ": "dummy filedF value",
"filedG ": "dummy filedG value"
... total 234 fields
}
}

Both runs succeeded

I wonder, whether our batches are getting truncated or not. Do we misconfigure anything? How should we check for errors in the ES logs and Spark logs?

Are you sure that the mapping id is unique per document? Is there a chance that multiple records with the same document id are being applied in different partitions of the job? Your settings should be generally safe to operate with, but I'm mostly concerned about there being some sort of concurrent modifications happening across different partitions that should instead be collapsed together before performing an upsert.

Hi @james.baiera

Thanks for your reply. I confirm that the mapping id is unique in our application

Your words make sense. Here is the possible place where concurrent writing happens.

When we index indexA_main, we actually divide data of indexA_main into two dataframes, one dataframe (df) of data is indexed as I state in the thread, the other dataframe (df2) is indexed as follows with an update script. From the problematic records in ES, we only see fields in df2 but not df1.

df.write
.format("org.elasticsearch.spark.sql")
.option("es.resource", "indexA_staging/indexA_main")
.option("es.write.operation", "upsert")
.option("es.mapping.id","conformedid")
.option("es.batch.write.refresh","false")
.option("es.index.auto.create","no")
.option("es.nodes", "10.7.17.250:9200")
.option("es.batch.size.bytes", "2mb")
.option("es.batch.size.entries", "2000")
.option("es.write.operation", "upsert")
.mode("append")
.save("indexA_staging/indexA_main")

// we have some other child documents indexing jobs between df indexing and df2 indexing, 
// these jobs take about 30 mins


df2.write
.format("org.elasticsearch.spark.sql")
.option("es.resource", "indexA_staging/indexA_main")
.option("es.mapping.id","conformedid")
.option("es.batch.write.refresh","false")
.option("es.index.auto.create","no")
.option("es.nodes", "10.7.17.250:9200")
.option("es.batch.size.bytes", "2mb")
.option("es.batch.size.entries", "10000")
.option("es.update.script", "def update =false;...")
.option("es.update.script.lang", "painless")
.option("es.write.operation", "upsert")
.mode("append")
.save("indexA_staging/indexA_main")

Is there a way that we can secure df2's indexing being run after df's indexing finishes completely?
Or is there a way to prevent ES service from consolidating upsert requests with the same mapping id?

Thanks!

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.