Hi adam demjen,
I was using the Elasticsearch node client version @elastic/elasticsearch": "^8.10.0 .
Attaching my code below:
private async updateWorkflowAssociatedInNewhire(data: SQSNewHireDataInterface, operation: string) {
const indexName = await this.getIndexName(ModuleIds.Newhire, data.accountid);
const filters = new Array<QueryDslQueryContainer>();
filters.push({ term: { [NewHireFields.onboardingcandidateid]: data.onboardingcandidateid } });
const advancedFilterQueries = {
filters: filters,
};
const script: Script = {
source: `
if (ctx._source.containsKey('workflowassociated')) {
if (params.updateDataObj[0].teamroleids == null || params.updateDataObj[0].teamroleids.isEmpty()) {
ctx._source.workflowassociated.removeIf(j -> j.workflowid == params.updateDataObj[0].workflowid && j.userid == params.updateDataObj[0].userid)
} else {
if (ctx._source.workflowassociated.size() == 1 && ctx._source.workflowassociated[0].userid == null) {
ctx._source.workflowassociated = [];
ctx._source.workflowassociated.add(params.updateDataObj[0]);
} else {
def existingUser = ctx._source.workflowassociated.find(j -> j.workflowid == params.updateDataObj[0].workflowid && j.userid == params.updateDataObj[0].userid);
if (existingUser != null) {
existingUser.putAll(params.updateDataObj[0]);
} else {
ctx._source.workflowassociated.add(params.updateDataObj[0]);
}
}
}
} else {
ctx._source.workflowassociated = params.updateDataObj;
}
`,
params: {
updateDataObj: data.workflowassociated,
},
};
const result = await this.elasticClient.updateByQuery({
index: indexName,
body: {
query: {
bool: {
filter: advancedFilterQueries.filters,
},
},
script: script,
},
});
if (result.updated) {
this.logAndStore.info(
`mode: UPDATE workflowassociated successful for newhire candidates with onbaordingcandidateid: ${data.onboardingcandidateid}`,
ModuleIds.Newhire,
{
data: data,
accountid: data.accountid,
onboardingcandidateid: data.onboardingcandidateid,
}
);
} else {
this.logAndStore.error(
`Failed to ${operation} workflowassociated with OnboardingCandidateid:${data.onboardingcandidateid} in newhire index`,
ModuleIds.Newhire,
{
data: data,
}
);
}
}
public async newHireIndexManager(data: SQSNewHireDataInterface, operation: string): Promise<void> {
try {
// account-wise separation for new hire indices
const indexName = await this.getIndexName(ModuleIds.Newhire, data.accountid);
switch (operation) {
case IndexOperations.CREATE: {
if (Array.isArray(data.onboardingcandidateid) && data.onboardingcandidateid.length === 1) {
data.onboardingcandidateid = data.onboardingcandidateid[0];
} else if (typeof data.onboardingcandidateid !== 'number') {
throw new Error('Incorrect newhire onboardingcandidateid in sqs data. Must be a number or number[] of length 1');
}
const res = this.elasticClient.create({
index: indexName,
id: String(data.onboardingcandidateid),
body: data,
});
this.logAndStore.info(`mode: ${(await res).result} for newhire onboardingcandidateId: ${data.onboardingcandidateid}`, ModuleIds.Newhire, {
data: data,
accountid: data.accountid,
candidatecode: data.candidatecode,
});
break;
}
case IndexOperations.UPDATE: {
if (Array.isArray(data.onboardingcandidateid) && data.onboardingcandidateid.length === 1) {
data.onboardingcandidateid = data.onboardingcandidateid[0];
} else if (typeof data.onboardingcandidateid !== 'number') {
throw new Error('Incorrect newhire onboardingCandidateId in sqs data. Must be a number or number[] of length 1');
}
if (data.workflowassociated) {
try {
await this.updateWorkflowAssociatedInNewhire(data, operation);
} catch (error) {
this.logAndStore.error('mode: Error in UPDATE request', ModuleIds.Newhire, { data: data });
}
}
const res = this.elasticClient.update({
index: indexName,
id: String(data.onboardingcandidateid),
body: {
doc: data,
},
retry_on_conflict: 3 /* eslint-disable-line */, // REASON: property is part of elasticsearch api. cannnot change
});
this.logAndStore.info(`mode: ${(await res).result} for newhire onboardingCandidateId: ${data.onboardingcandidateid}`, ModuleIds.Newhire, {
data: data,
accountid: data.accountid,
candidatecode: data.candidatecode,
});
break;
}
case IndexOperations.DELETE: {
// if onboardingcandidateid is a number then delete new hire with onboardingcandidateid. (single operation)
if (!Array.isArray(data.onboardingcandidateid)) {
const res = this.elasticClient.delete({
index: indexName,
id: String(data.onboardingcandidateid),
});
this.logAndStore.info(`mode: ${(await res).result} for newhire onboardingCandidateId: ${data.onboardingcandidateid}`, ModuleIds.Newhire, {
data: data,
accountid: data.accountid,
candidatecode: data.candidatecode,
});
} else {
// data.onboardingcandidateid is an array of newhire candidateids. Delete Bulk!
const operations = data.onboardingcandidateid.map((id: number) => {
return { delete: { _index: indexName, _id: id } };
});
const res = this.elasticClient.bulk({ index: indexName, body: operations });
res
.then((res) => {
if (res.errors) {
const erroredDocuments = new Array<Record<string, unknown>>();
res.items.forEach((operation, i) => {
if (operation.delete?.error) {
erroredDocuments.push({
status: operation.delete.status,
error: operation.delete.error,
operation: operations[i * 2],
document: operations[i * 2 + 1],
});
}
});
this.logAndStore.error('mode: Errors in BULK_DELETE request', ModuleIds.Newhire, { erroredDocuments: erroredDocuments, data: data });
} else {
this.logAndStore.info(`mode: BULK_DELETE successful for new hire candidateId: ${data.onboardingcandidateid}`, ModuleIds.Newhire, {
data: data,
accountid: data.accountid,
candidatecode: data.candidatecode,
});
}
})
.catch((_error) => {
throw new Error('Error While Bulk Deletion');
});
}
break;
}
default:
throw new Error(`unidentified operation: ${operation}`);
}
} catch (error) {
this.logAndStore.error(`Failed to ${operation} Newhire with onboarding candidateId:${data.onboardingcandidateid}`, ModuleIds.Newhire, {
error: error,
data: data,
});
throw error;
}
}
export interface SQSNewHireDataInterface extends Omit<Partial<NewHireInterface>, 'accountid' | 'onboardingcandidateid' | 'candidatecode'> {
accountid: number;
onboardingcandidateid: number | number[];
candidatecode?: number | number[];
}
export interface WorkflowAssociationInterface {
workflowid: number;
workflowstatus: number;
workflowname: string;
userid: number;
firstname: string;
middlename: string;
lastname: string;
email: string;
teamroleids: number[];
activationdates: string[] | DateString[];
dateofjoining: string | DateString;
}
export interface NewHireInterface {
accountid: number;
onboardingcandidateid: number;
email: string;
firstname: string;
middlename: string;
lastname: string;
candidateid: number;
candidatecode: number;
jobid: number;
jobcode: number;
jobtitle: string;
joblocation: string;
officelocation: string;
offerlocation: string;
offerdesignation: string;
businessverticalid: string;
practiceid: string;
zoneid: string;
departmentid: number;
departmentname: string;
createdbyuserid: number;
createdbyuseremail: string;
createdbyuserfirstname: string;
createdbyusermiddlename: string;
createdbyuserlastname: string;
createdtime: string | DateString;
processedtouserfullname: string;
processedtouseremail: string;
processedbyuserfullname: string;
processedbyuseremail: string;
lastprocessedon: string | DateString;
lastlogin: string | DateString;
workflowassociated: WorkflowAssociationInterface[];
hirestatus: number;
recenttaskcompleted: string | DateString;
taskscompleted: number;
totaltasks: number;
progression: number;
}
In the above function I have defined the cases such that once I got the sequence of messages from SQS queue. It will process the data object and update the fields accordingly.
Sample SQS message:
{
"operation": "UPDATE",
"newHireData": {
"accountid": 215,
"onboardingcandidateid": 316,
"workflowassociated": [
{
"firstname": "testt",
"workflowstatus": 1,
"workflowname": "dont change this workflow - 777 multiple task to complete",
"dateofjoining": "2024-03-12T00:00:00.000Z",
"middlename": "",
"progression": 0,
"userid": 15,
"lastname": "name 15",
"activationdates": [
"2024-03-13T06:22:44.000Z"
],
"taskscompleted": 0,
"recenttaskcompleted": null,
"workflowid": 672,
"email": "testname15@yopmail.com",
"totaltasks": 1,
"teamroleids": [
2, 5
]
}
]
}
}
Try sending different workflowassociated object with different workflowids , userids and teamroleids. Try calling the API several times at once with diferent data that you have created similar to above. This was the scenario where I'm facing the issue with node client code.