Painless script issue with putAll() and add() methods

I have written a painless script to update a nested field named "workflowassociated" in which I have fields "teamroleids", "userid", "workflowid", etc. I have written some required conditions in the script to update the "workflowassociated" field as per the requirement.

Issue that I have faced: In kibana the below script is working fine as expected but the same script if I'm trying to use in my code base using Elasticsearch version @elastic/elasticsearch": "^8.10.0 and "ts-node": "^8.6.0",
"typescript": "^4.3.5". Script is not working properly it is removing all the existing objects in workflowassociated and adding the new object when using the script with updateby query API in Elasticsearch.

Example. Say initially the workflowassociated is array list of objects

[{
   "userid": 123,
   "workflowid": 123,
   "teamroleids": [1,2 ],
    "firstname": "",
    "middlename": "",
   "lastname": ""
}]

Script :

    const script: Script = {
      source: `
      if (ctx._source.containsKey('workflowassociated')) {
        if (params.updateDataObj[0].teamroleids == null || params.updateDataObj[0].teamroleids.isEmpty()) {
          ctx._source.workflowassociated.removeIf(j -> j.workflowid == params.updateDataObj[0].workflowid && j.userid == params.updateDataObj[0].userid)
        } else {
          if (ctx._source.workflowassociated.size() == 1 && ctx._source.workflowassociated[0].userid == null) {
            ctx._source.workflowassociated = [];
            ctx._source.workflowassociated.add(params.updateDataObj[0]);
          } else {
            def existingUser = ctx._source.workflowassociated.find(j -> j.workflowid == params.updateDataObj[0].workflowid && j.userid == params.updateDataObj[0].userid);
            if (existingUser != null) {
              existingUser.putAll(params.updateDataObj[0]);
            } else {
              ctx._source.workflowassociated.add(params.updateDataObj[0]);
            }
          }
        }
      } else {
        ctx._source.workflowassociated = params.updateDataObj;
      }
      `,
      params: {
        updateDataObj: data.workflowassociated,
      },
    };

Hi Neeraj, welcome to the community! :wave:

I'm trying to replicate your script's behavior in Kibana Dev Tools (if there is a bug, it's most likely in the script and it's unrelated to the language client we use, so Dev Tools should behave the same).

If I understood correctly, you want the script to either

  • update an entry in the workflowassociated array that matches your input data object by workflowid and userid, if it's found, or
  • add the data object as a new entry to workflowassociated if there's no match.

Here's how I tested the script:

PUT painlesstest
{
  "mappings": {
    "properties": {
      "workflowassociated": {
        "type": "nested",
        "properties": {
          "userid": {"type": "integer"},
          "workflowid": {"type": "integer"},
          "teamroleids": {"type": "integer"},
          "firstname": {"type": "text"},
          "middlename": {"type": "text"},
          "lastname":  {"type": "text"}
        }
      }
    }
  }
}

PUT painlesstest/_doc/1
{
  "workflowassociated": [
    {
      "userid": 123,  // Will be updated
      "workflowid": 123,
      "teamroleids": [1, 2],
      "firstname": "",
      "middlename": "",
      "lastname": ""
    },
    {
      "userid": 456,  // Will NOT be updated
      "workflowid": 789,
      "teamroleids": [1, 2],
      "firstname": "Bob",
      "middlename": "",
      "lastname": "Roberts"
    }
  ]
}

POST painlesstest/_update_by_query
{
  "script": {
    "source": """
      if (ctx._source.containsKey('workflowassociated')) {
        if (params.updateDataObj[0].teamroleids == null || params.updateDataObj[0].teamroleids.isEmpty()) {
          ctx._source.workflowassociated.removeIf(j -> j.workflowid == params.updateDataObj[0].workflowid && j.userid == params.updateDataObj[0].userid)
        } else {
          if (ctx._source.workflowassociated.size() == 1 && ctx._source.workflowassociated[0].userid == null) {
            ctx._source.workflowassociated = [];
            ctx._source.workflowassociated.add(params.updateDataObj[0]);
          } else {
            def existingUser = ctx._source.workflowassociated.find(j -> j.workflowid == params.updateDataObj[0].workflowid && j.userid == params.updateDataObj[0].userid);
            if (existingUser != null) {
              existingUser.putAll(params.updateDataObj[0]);
            } else {
              ctx._source.workflowassociated.add(params.updateDataObj[0]);
            }
          }
        }
      } else {
        ctx._source.workflowassociated = params.updateDataObj;
      }    
    """,
    "params": {
      "updateDataObj": [{
        "userid": 123,
        "workflowid": 123,
        "teamroleids": [1, 2, 3],
        "firstname": "Alice",
        "middlename": "",
        "lastname": "Beal"
      }]
    },
    "lang": "painless"
  }
}


GET painlesstest/_doc/1
...
"workflowassociated": [
      {
        "firstname": "Alice",
        "middlename": "",
        "userid": 123,  // Was updated because of matching IDs
        "workflowid": 123,
        "teamroleids": [1, 2, 3],
        "lastname": "Beal"
      },
      {
        "firstname": "Bob",
        "middlename": "",
        "userid": 456,   // Was not updated
        "workflowid": 789,
        "teamroleids": [1, 2],
        "lastname": "Roberts"
      },

I also tested the use case when data contains a workflowid/userid pair that is NOT in workflowassociated yet, and the entry was added to the array.

So to me it looks like the script is working properly and I didn't see evidence of removing existing objects from workflowassociated. Could you test the same in your Kibana Dev Tools and describe where the script is not working as expected?

Alternatively if you don't have Kibana installed, then please just walk me through an example where the script not working, and provide details so I can replicate the issue (e.g. initial state of document, data object, expected vs observed output after running the update).

Hi Adam Demjen,

Hope you are doing well. Thanks for the response and the script logic that you understood was correct as per my requirements. I have tested the same script that I have attached previously in kibana dev tools which is working fine. I'm able to add / update the data accordingly in workflowassociated field mentioned. The issue is if I'm using the same script in my code base with Elasticsearch client @elastic/elasticsearch": "^8.10.0. The behaviour of the script isn't the same. It is removing all the existing objects in the workflowassociated field and adding the new object that was sent in the params of the update by query API.

Please try to replicate the same script in a update by query API using Elasticsearch client with node.js.

Hey Neeraj,

I tried the same with the Node client @elastic/elasticsearch": "^8.10.0 and the script was working as expected: it only updated the 1st entry in my example, leaving the other one intact.

Could you share:

  • The Elasticsearch version are you using, and whether it's on Cloud or standalone
  • The part of your code that populates data.workflowassociated
  • The part of your code where the updateByQuery() call is made
  • Can you confirm that you have tested with my examples and the code was removing all entries from workflowassociated (so you ended up with the firstname: "Alice" entry only)

so I can investigate further.

Hi adam demjen,

I was using the Elasticsearch node client version @elastic/elasticsearch": "^8.10.0 .

Attaching my code below:

  private async updateWorkflowAssociatedInNewhire(data: SQSNewHireDataInterface, operation: string) {
    const indexName = await this.getIndexName(ModuleIds.Newhire, data.accountid);
    const filters = new Array<QueryDslQueryContainer>();

    filters.push({ term: { [NewHireFields.onboardingcandidateid]: data.onboardingcandidateid } });
    const advancedFilterQueries = {
      filters: filters,
    };
    const script: Script = {
      source: `
      if (ctx._source.containsKey('workflowassociated')) {
        if (params.updateDataObj[0].teamroleids == null || params.updateDataObj[0].teamroleids.isEmpty()) {
          ctx._source.workflowassociated.removeIf(j -> j.workflowid == params.updateDataObj[0].workflowid && j.userid == params.updateDataObj[0].userid)
        } else {
          if (ctx._source.workflowassociated.size() == 1 && ctx._source.workflowassociated[0].userid == null) {
            ctx._source.workflowassociated = [];
            ctx._source.workflowassociated.add(params.updateDataObj[0]);
          } else {
            def existingUser = ctx._source.workflowassociated.find(j -> j.workflowid == params.updateDataObj[0].workflowid && j.userid == params.updateDataObj[0].userid);
            if (existingUser != null) {
              existingUser.putAll(params.updateDataObj[0]);
            } else {
              ctx._source.workflowassociated.add(params.updateDataObj[0]);
            }
          }
        }
      } else {
        ctx._source.workflowassociated = params.updateDataObj;
      }
      `,
      params: {
        updateDataObj: data.workflowassociated,
      },
    };

    const result = await this.elasticClient.updateByQuery({
      index: indexName,
      body: {
        query: {
          bool: {
            filter: advancedFilterQueries.filters,
          },
        },
        script: script,
      },
    });

    if (result.updated) {
      this.logAndStore.info(
        `mode: UPDATE workflowassociated successful for newhire candidates with onbaordingcandidateid: ${data.onboardingcandidateid}`,
        ModuleIds.Newhire,
        {
          data: data,
          accountid: data.accountid,
          onboardingcandidateid: data.onboardingcandidateid,
        }
      );
    } else {
      this.logAndStore.error(
        `Failed to ${operation} workflowassociated with OnboardingCandidateid:${data.onboardingcandidateid} in newhire index`,
        ModuleIds.Newhire,
        {
          data: data,
        }
      );
    }
  }
  public async newHireIndexManager(data: SQSNewHireDataInterface, operation: string): Promise<void> {
    try {
      // account-wise separation for new hire indices
      const indexName = await this.getIndexName(ModuleIds.Newhire, data.accountid);

      switch (operation) {
        case IndexOperations.CREATE: {
          if (Array.isArray(data.onboardingcandidateid) && data.onboardingcandidateid.length === 1) {
            data.onboardingcandidateid = data.onboardingcandidateid[0];
          } else if (typeof data.onboardingcandidateid !== 'number') {
            throw new Error('Incorrect newhire onboardingcandidateid in sqs data. Must be a number or number[] of length 1');
          }
          const res = this.elasticClient.create({
            index: indexName,
            id: String(data.onboardingcandidateid),
            body: data,
          });

          this.logAndStore.info(`mode: ${(await res).result} for newhire onboardingcandidateId: ${data.onboardingcandidateid}`, ModuleIds.Newhire, {
            data: data,
            accountid: data.accountid,
            candidatecode: data.candidatecode,
          });
          break;
        }

        case IndexOperations.UPDATE: {
          if (Array.isArray(data.onboardingcandidateid) && data.onboardingcandidateid.length === 1) {
            data.onboardingcandidateid = data.onboardingcandidateid[0];
          } else if (typeof data.onboardingcandidateid !== 'number') {
            throw new Error('Incorrect newhire onboardingCandidateId in sqs data. Must be a number or number[] of length 1');
          }

          if (data.workflowassociated) {
            try {
              await this.updateWorkflowAssociatedInNewhire(data, operation);
            } catch (error) {
              this.logAndStore.error('mode: Error in UPDATE request', ModuleIds.Newhire, { data: data });
            }
          }

          const res = this.elasticClient.update({
            index: indexName,
            id: String(data.onboardingcandidateid),
            body: {
              doc: data,
            },
            retry_on_conflict: 3 /* eslint-disable-line */, // REASON: property is part of elasticsearch api. cannnot change
          });

          this.logAndStore.info(`mode: ${(await res).result} for newhire onboardingCandidateId: ${data.onboardingcandidateid}`, ModuleIds.Newhire, {
            data: data,
            accountid: data.accountid,
            candidatecode: data.candidatecode,
          });
          break;
        }

        case IndexOperations.DELETE: {
          // if onboardingcandidateid is a number then delete new hire with onboardingcandidateid. (single operation)
          if (!Array.isArray(data.onboardingcandidateid)) {
            const res = this.elasticClient.delete({
              index: indexName,
              id: String(data.onboardingcandidateid),
            });

            this.logAndStore.info(`mode: ${(await res).result} for newhire onboardingCandidateId: ${data.onboardingcandidateid}`, ModuleIds.Newhire, {
              data: data,
              accountid: data.accountid,
              candidatecode: data.candidatecode,
            });
          } else {
            // data.onboardingcandidateid is an array of newhire candidateids. Delete Bulk!
            const operations = data.onboardingcandidateid.map((id: number) => {
              return { delete: { _index: indexName, _id: id } };
            });
            const res = this.elasticClient.bulk({ index: indexName, body: operations });

            res
              .then((res) => {
                if (res.errors) {
                  const erroredDocuments = new Array<Record<string, unknown>>();

                  res.items.forEach((operation, i) => {
                    if (operation.delete?.error) {
                      erroredDocuments.push({
                        status: operation.delete.status,
                        error: operation.delete.error,
                        operation: operations[i * 2],
                        document: operations[i * 2 + 1],
                      });
                    }
                  });
                  this.logAndStore.error('mode: Errors in BULK_DELETE request', ModuleIds.Newhire, { erroredDocuments: erroredDocuments, data: data });
                } else {
                  this.logAndStore.info(`mode: BULK_DELETE successful for new hire candidateId: ${data.onboardingcandidateid}`, ModuleIds.Newhire, {
                    data: data,
                    accountid: data.accountid,
                    candidatecode: data.candidatecode,
                  });
                }
              })
              .catch((_error) => {
                throw new Error('Error While Bulk Deletion');
              });
          }
          break;
        }

        default:
          throw new Error(`unidentified operation: ${operation}`);
      }
    } catch (error) {
      this.logAndStore.error(`Failed to ${operation} Newhire with onboarding candidateId:${data.onboardingcandidateid}`, ModuleIds.Newhire, {
        error: error,
        data: data,
      });
      throw error;
    }
  }
export interface SQSNewHireDataInterface extends Omit<Partial<NewHireInterface>, 'accountid' | 'onboardingcandidateid' | 'candidatecode'> {
  accountid: number;
  onboardingcandidateid: number | number[];
  candidatecode?: number | number[];
}
export interface WorkflowAssociationInterface {
  workflowid: number;
  workflowstatus: number;
  workflowname: string;
  userid: number;
  firstname: string;
  middlename: string;
  lastname: string;
  email: string;
  teamroleids: number[];
  activationdates: string[] | DateString[];
  dateofjoining: string | DateString;
}

export interface NewHireInterface {
  accountid: number;
  onboardingcandidateid: number;
  email: string;
  firstname: string;
  middlename: string;
  lastname: string;
  candidateid: number;
  candidatecode: number;
  jobid: number;
  jobcode: number;
  jobtitle: string;
  joblocation: string;
  officelocation: string;
  offerlocation: string;
  offerdesignation: string;
  businessverticalid: string;
  practiceid: string;
  zoneid: string;
  departmentid: number;
  departmentname: string;
  createdbyuserid: number;
  createdbyuseremail: string;
  createdbyuserfirstname: string;
  createdbyusermiddlename: string;
  createdbyuserlastname: string;
  createdtime: string | DateString;
  processedtouserfullname: string;
  processedtouseremail: string;
  processedbyuserfullname: string;
  processedbyuseremail: string;
  lastprocessedon: string | DateString;
  lastlogin: string | DateString;
  workflowassociated: WorkflowAssociationInterface[];
  hirestatus: number;
  recenttaskcompleted: string | DateString;
  taskscompleted: number;
  totaltasks: number;
  progression: number;
}

In the above function I have defined the cases such that once I got the sequence of messages from SQS queue. It will process the data object and update the fields accordingly.

Sample SQS message:

{
    "operation": "UPDATE",
    "newHireData": {
        "accountid": 215,
        "onboardingcandidateid": 316,
        "workflowassociated": [
            {
                "firstname": "testt",
                "workflowstatus": 1,
                "workflowname": "dont change this workflow - 777 multiple task to complete",
                "dateofjoining": "2024-03-12T00:00:00.000Z",
                "middlename": "",
                "progression": 0,
                "userid": 15,
                "lastname": "name 15",
                "activationdates": [
                    "2024-03-13T06:22:44.000Z"
                ],
                "taskscompleted": 0,
                "recenttaskcompleted": null,
                "workflowid": 672,
                "email": "testname15@yopmail.com",
                "totaltasks": 1,
                "teamroleids": [
                    2, 5
                ]
            }
        ]
    }
}

Try sending different workflowassociated object with different workflowids , userids and teamroleids. Try calling the API several times at once with diferent data that you have created similar to above. This was the scenario where I'm facing the issue with node client code.

Thanks for sharing some context Neeraj. The fact that your process is listening to an SQS is important. We might be looking at a race condition here.

I'd like to ask you to try and narrow the scope down to a minimum to rule out the possibility of a race condition:

  • Does the output differ if you send subsequent messages through SQS in quick succession, versus if you execute the same updates manually by calling the _update_by_query endpoint?
  • If you put a get() request after the call of updateWorkflowAssociatedInNewhire() and log the full content of the actual document, can you identify where workflowassociated gets cleared?

Furthermore, without diving too deep into your business logic, I'd verify this code block in newHireIndexManager():

// ...
              await this.updateWorkflowAssociatedInNewhire(data, operation);
// ...

          const res = this.elasticClient.update({
            index: indexName,
            id: String(data.onboardingcandidateid),
            body: {
              doc: data,
            },
          });

From a first glance it looks like the doc is being overwritten by the update() call and so the effect of updateWorkflowAssociatedInNewhire() is lost. It might be worth putting a get() + log document between these two function calls too.

Hi adam,

So I have cross verified my code as per the questions that you have raised. So while sending subsequent messages through SQS I observed that the last update message object from SQS queue is added to workflowassociated array and all the previous data is removed from the field. If I update the workflowassociated manually by sending the update object through params in the script. Not able to update / add the message object in workflowassociated.

Related data:
Current data in my index:
image

Manually added data in params of the script

  private async updateWorkflowAssociatedInNewhire(data: SQSNewHireDataInterface, operation: string) {
    const indexName = await this.getIndexName(ModuleIds.Newhire, data.accountid);
    const filters = new Array<QueryDslQueryContainer>();

    filters.push({ term: { [NewHireFields.onboardingcandidateid]: data.onboardingcandidateid } });
    const advancedFilterQueries = {
      filters: filters,
    };
    const script: Script = {
      source: `
      if (ctx._source.containsKey('workflowassociated')) {
        if (params.updateDataObj[0].teamroleids == null || params.updateDataObj[0].teamroleids.isEmpty()) {
          ctx._source.workflowassociated.removeIf(j -> j.workflowid == params.updateDataObj[0].workflowid && j.userid == params.updateDataObj[0].userid)
        } else {
          if (ctx._source.workflowassociated.size() == 1 && ctx._source.workflowassociated[0].userid == null) {
            ctx._source.workflowassociated = [];
            ctx._source.workflowassociated.add(params.updateDataObj[0]);
          } else {
            def existingUser = ctx._source.workflowassociated.find(j -> j.workflowid == params.updateDataObj[0].workflowid && j.userid == params.updateDataObj[0].userid);
            if (existingUser != null) {
              existingUser.putAll(params.updateDataObj[0]);
            } else {
              ctx._source.workflowassociated.add(params.updateDataObj[0]);
            }
          }
        }
      } else {
        ctx._source.workflowassociated = params.updateDataObj;
      }
      `,
      params: {
        updateDataObj: [
          {
            firstname: 'testt',
            workflowstatus: 1,
            workflowname: 'dont change this workflow - 777 multiple task to complete',
            dateofjoining: '2024-03-12T00:00:00.000Z',
            middlename: '',
            progression: 0,
            userid: 23,
            lastname: 'name 23',
            activationdates: ['2024-03-13T06:22:44.000Z'],
            taskscompleted: 0,
            recenttaskcompleted: null,
            workflowid: 672,
            email: 'testname23@yopmail.com',
            totaltasks: 1,
            teamroleids: [2, 5]
          }
        ],
      },
    };

    const result = await this.elasticClient.updateByQuery({
      index: indexName,
      body: {
        query: {
          bool: {
            filter: advancedFilterQueries.filters,
          },
        },
        script: script,
      },
    });

    const ans = await this.elasticClient.search<NewHireInterface>({
      index: indexName,
      body: {
        query: {
          bool: {
            filter: advancedFilterQueries.filters,
          },
        },
      },
      _source: {
        includes: [NewHireFields.workflowassociated],
      },
    });

    console.log('-------------------');
    console.log(ans.hits.hits[0]._source?.workflowassociated);
    console.log('--------------------');

    if (result.updated) {
      this.logAndStore.info(
        `mode: UPDATE workflowassociated successful for newhire candidates with onbaordingcandidateid: ${data.onboardingcandidateid}`,
        ModuleIds.Newhire,
        {
          data: data,
          accountid: data.accountid,
          onboardingcandidateid: data.onboardingcandidateid,
        }
      );
    } else {
      this.logAndStore.error(
        `Failed to ${operation} workflowassociated with OnboardingCandidateid:${data.onboardingcandidateid} in newhire index`,
        ModuleIds.Newhire,
        {
          data: data,
        }
      );
    }
  }

And also I have tried as you mentioned. By printing the data from get API in Elasticsearch. In the above case the GET API is not giving any data. I have checked in the kibana console if the data got updated or not. The workflowassociated field not updated .

Kibana search result:

          "workflowassociated": [
            {
              "firstname": "testt",
              "workflowstatus": 1,
              "workflowname": "dont change this workflow - 777 multiple task to complete",
              "dateofjoining": "2024-03-12T00:00:00.000Z",
              "middlename": "",
              "progression": 0,
              "userid": 22,
              "lastname": "name 22",
              "activationdates": [
                "2024-03-13T06:22:44.000Z"
              ],
              "taskscompleted": 0,
              "recenttaskcompleted": null,
              "workflowid": 672,
              "email": "testname22@yopmail.com",
              "totaltasks": 1,
              "teamroleids": [
                2,
                5
              ]
            }
          ],

Please let me know if any mistakes from my end. Thanks.

Hi Neeraj,

I ran your script (manually, not via SQS) and I saw that the search() call returned a version of the document that didn't have the new workflowassociated entry (it only had one). But a few seconds later when I ran the same search in Kibana, the entry was there.

This leads me to believe that Elasticsearch's eventual consistency could be the cause here; quick subsequent fires of the update_by_query operation does not always have the most up-to-date version of the document, and so the previous insertion is lost.

To fix this, please try forcing a refresh in the update_by_query call:

  const result = await client.updateByQuery({
    index,
    body: {
      query: {
        bool: {
          filter: advancedFilterQueries.filters,
        },
      },
      script: script,
    },
    refresh: true  // <- here
  });

This will ensure all shards are updated with the result, but may incur some slowness.

If that doesn't work, then I'm afraid you'll need to debug the issue with your code and on your infrastructure. I'd try gradually narrowing the problem space down. E.g. does the same happen if the documents you're updating have a simple schema and workflowassociated objects have a single field? Does the behavior change if you construct a new list and then set it to ctx._source.workflowassociated instead of adding the element to it with ctx._source.workflowassociated.add()? Etc.

The Typescript client should have nothing to do with this behavior; it's just a language-specific wrapper for API calls. If there is a race condition (or bug), it's likely in the way Elasticsearch handles the update_by_query request in conjunction with Painless script.

I hope this helps!

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.