Ctx._source is not up to date

Hi All,

I have a strange problem using the update Api and a script painless to update the document when I have concurrent updates on the same document. It seems to happen only on nested fields, randomly and with a frequency of 1 case over 100000 updates in production.
I'll try to describe it with an example:

  1. I add a document to the index using the 'upsert' option of the Update APi, so in this case no painless code is executed, the _source is something like that:
    { "custom" : { "field1" : "value1" }, "standard" : { "field2" : "value2" }, "product" :[ { "standard" : { "id": "12", "field1" : "value1" }, "custom" : { "field2" : "value2" }, }, ] }
    note that "product" is mapped as nested

  2. Immediatly after the upsert, with a new bulk action, I send a new Update call to add "field3": "value3" to product/standard where id = 12, what I get is:
    { "custom" : { "field1" : "value1" }, "standard" : { "field2" : "value2" }, "product" :[ { "standard" : { "id": "12", "field3" : "value3" } }, ] }
    so the nested document is not updated but re-done from scratch
    The script I'm using is this:

void updateFields(def source, def values)
{
for (item in values.entrySet()) {
def type = item.getKey();

if (source[type] instanceof ArrayList || source[type] == null) {
    source[type] = [:];
}

source[type].putAll(item.getValue());
source[type].values().removeIf(v -> (v instanceof List && v.length == 0) || v == null);

}
}

def source = ctx._source;
def fieldsParam = params.fields;

if (fieldsParam.size() != 0) {
updateFields(source, fieldsParam);
}

def relatedParam = params.related;

if (relatedParam.size() != 0) {

for (item in relatedParam.entrySet()) {

    def classes = item.getValue();
    
    for (classObj in classes.entrySet()) {
        def relation = classObj.getKey();

        if (source[relation] == null) {
            source[relation] = [:];
        }

        def classSubdocuments = classObj.getValue();
        ArrayList done = new ArrayList();
        ArrayList toDelete = new ArrayList();        

        for (def subDoc : source[relation]) {
            String subdocumentId = subDoc['standard']['id'];
			if (classSubdocuments.containsKey(subdocumentId)) {
				def value = classSubdocuments[subdocumentId];
              
                if (value == 'deleted') {
                    toDelete.add(subdocumentId);
                } else {
                    updateFields(subDoc, value);
                }

                done.add(subdocumentId);
			}
		}

        source[relation].removeIf(it -> toDelete.contains(it.standard.id + '') );

        def toAdd = classSubdocuments.keySet();
        toAdd.removeAll(done);
        toAdd = toAdd.toArray();

        for (def toAddId : toAdd) {
            def classSubdoc = classSubdocuments.get(toAddId);
            source[relation].add(classSubdoc);
        }
    }
}

}

Sorry for the code above but I'm not able to format it :frowning:

The problem seems that ctx._source seems to be not up to date when the second update came, so the script re-create the nested sub-document

Notes:

  1. I'm using ES 5.6
  2. I'm NOT able to reproduce this problem locally, it's frequency is far too low, but it seems the second update goes into the " if (source[type] instanceof ArrayList || source[type] == null) {"
  3. the update Api is called with _retry_on_conflict = 100
  4. the bulk api is called with refresh = false

Could be related to https://github.com/elastic/elasticsearch/issues/19269? We have a cluster as well