Set processor isn't working after another Set processor - Injest Pipeline

Hi,

I'm copying value of responseBody.results into embedded_original_value_obj both are
object type fields.

This is my defined pipeline -

PUT _ingest/pipeline/apiproxy-embedded-copy
{
  "processors": [
    {
   "set": {
        "field": "embedded_original_value_obj",
        "copy_from": "responseBody.results",
        "ignore_empty_value": true
      }
    },
    {
   "set": {
        "field": "embedded_error_code",
        "copy_from": "embedded_original_value_obj.statusCode",
        "ignore_empty_value": true
      }
    }
]
}

For Testing I'm using this data -

POST _ingest/pipeline/apiproxy-embedded-copy/_simulate
{
  "docs": [
    {
      "_source": {
        "responseBody": {
          "results": [
            {
              "userMessage": "SUCCESS",
              "hasErrors": false,
              "developerMessage": "SUCCESS",
              "customerId": "1420371438",
              "messageId": null,
              "uniqueId": "P0000049373",
              "errors": [],
              "statusCode": 201
            }
          ]
        }
      }
    }
  ]
}

output

{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "embedded_original_value_obj" : [
            {
              "userMessage" : "SUCCESS",
              "hasErrors" : false,
              "developerMessage" : "SUCCESS",
              "customerId" : "1420371438",
              "messageId" : null,
              "uniqueId" : "P0000049373",
              "errors" : [ ],
              "statusCode" : 201
            }
          ],
          "responseBody" : {
            "results" : [
              {
                "userMessage" : "SUCCESS",
                "hasErrors" : false,
                "developerMessage" : "SUCCESS",
                "customerId" : "1420371438",
                "messageId" : null,
                "uniqueId" : "P0000049373",
                "errors" : [ ],
                "statusCode" : 201
              }
            ]
          }
        },
        "_ingest" : {
          "timestamp" : "2022-03-01T07:35:59.914902863Z"
        }
      }
    }
  ]
}

1st set is working and I'm getting value in embedded_original_value_obj but 2nd set isn't working. Any idea how can get value of embedded_original_value_obj.statusCode value into embedded_error_code :question:

Are you sure responseBody.results is an array, if not, you can do as follows

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "processors": [
      {
        "set": {
          "field": "embedded_original_value_obj",
          "copy_from": "responseBody.results",
          "ignore_empty_value": true
        }
      },
      {
        "set": {
          "field": "embedded_error_code",
          "copy_from": "embedded_original_value_obj.statusCode",
          "ignore_empty_value": true
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "responseBody": {
          "results": {
            "userMessage": "SUCCESS",
            "hasErrors": false,
            "developerMessage": "SUCCESS",
            "customerId": "1420371438",
            "messageId": null,
            "uniqueId": "P0000049373",
            "errors": [],
            "statusCode": 201
          }
        }
      }
    }
  ]
}
1 Like

Hi @casterQ ,
This could be working fine for me (and it did work with your above data ) if results was a json object but it is actually a list. The responseBody data comes as -

"responseBody" : {
            "results" : [
              {
                "statusCode" : 201
              }
            ]
          }

if it is a list(or array), so there will be many statusCode here,so you want embedded_error_code field to be as shown below?

"embedded_error_code":[201,202,203]

There will be only 1 statuscode per document i.e. results[] only contain 1 statusCode within a responseBody.

I want embedded_error_code to be created for every doc where embedded_original_value_obj got created.

so structure should be -
"embedded_error_code" : 201

Does this meet your needs?

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "processors": [
      {
        "set": {
          "field": "embedded_original_value_obj",
          "copy_from": "responseBody.results",
          "ignore_empty_value": true
        }
      },
      {
        "foreach": {
          "field": "embedded_original_value_obj",
          "processor": {
            "set": {
              "field": "embedded_error_code",
              "if":"ctx.embedded_error_code==null",
              "value": "{{_ingest._value.statusCode}}"
            }
          }
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "responseBody": {
          "results": [
            {
              "userMessage": "SUCCESS",
              "hasErrors": false,
              "developerMessage": "SUCCESS",
              "customerId": "1420371438",
              "messageId": null,
              "uniqueId": "P0000049373",
              "errors": [],
              "statusCode": 201
            },
            {
              "userMessage": "SUCCESS",
              "hasErrors": false,
              "developerMessage": "SUCCESS",
              "customerId": "1420371438",
              "messageId": null,
              "uniqueId": "P0000049373",
              "errors": []
            }
          ]
        }
      }
    }
  ]
}
1 Like

is it ok?

rename statusCode to embedded_error_code :

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "processors": [
      {
        "set": {
          "field": "embedded_original_value_obj",
          "copy_from": "responseBody.results",
          "ignore_empty_value": true
        }
      },
      {
        "foreach": {
          "field": "embedded_original_value_obj",
          "processor": {
            "rename": {
              "field": "_ingest._value.statusCode",
              "target_field": "_ingest._value.embedded_error_code"
            }
          }
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "responseBody": {
          "results": [
            {
              "userMessage": "SUCCESS",
              "hasErrors": false,
              "developerMessage": "SUCCESS",
              "customerId": "1420371438",
              "messageId": null,
              "uniqueId": "P0000049373",
              "errors": [],
              "statusCode": 201
            },
            {
              "userMessage": "SUCCESS",
              "hasErrors": false,
              "developerMessage": "SUCCESS",
              "customerId": "1420371438",
              "messageId": null,
              "uniqueId": "P0000049373",
              "errors": [],
              "statusCode": 202
            }
          ]
        }
      }
    }
  ]
}

Add additional field embedded_error_code:

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "processors": [
      {
        "set": {
          "field": "embedded_original_value_obj",
          "copy_from": "responseBody.results",
          "ignore_empty_value": true
        }
      },
      {
        "foreach": {
          "field": "embedded_original_value_obj",
          "processor": {
            "set": {
              "field": "_ingest._value.embedded_error_code",
              "copy_from": "_ingest._value.statusCode"
            }
          }
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "responseBody": {
          "results": [
            {
              "userMessage": "SUCCESS",
              "hasErrors": false,
              "developerMessage": "SUCCESS",
              "customerId": "1420371438",
              "messageId": null,
              "uniqueId": "P0000049373",
              "errors": [],
              "statusCode": 201
            },
            {
              "userMessage": "SUCCESS",
              "hasErrors": false,
              "developerMessage": "SUCCESS",
              "customerId": "1420371438",
              "messageId": null,
              "uniqueId": "P0000049373",
              "errors": [],
              "statusCode": 202
            }
          ]
        }
      }
    }
  ]
}

Hi @casterQ ,
It worked with _simulate (above code you mentioned) but it's giving me an error while I reindexed my data.

"cause" : {
        "type" : "illegal_argument_exception",
        "reason" : "field [embedded_original_value_obj] not present as part of path [embedded_original_value_obj]"
      },

Which of the above three DSL is you need?

Hi @casterQ ,
1st pipeline you gave is working for now!!!

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "processors": [
      {
        "set": {
          "field": "embedded_original_value_obj",
          "copy_from": "responseBody.results",
          "ignore_empty_value": true
        }
      },
      {
        "foreach": {
          "field": "embedded_original_value_obj",
          "processor": {
            "set": {
              "field": "embedded_error_code",
              "if":"ctx.embedded_error_code==null",
              "value": "{{_ingest._value.statusCode}}"
            }
          }
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "responseBody": {
          "results": [
            {
              "userMessage": "SUCCESS",
              "hasErrors": false,
              "developerMessage": "SUCCESS",
              "customerId": "1420371438",
              "messageId": null,
              "uniqueId": "P0000049373",
              "errors": [],
              "statusCode": 201
            }
          ]
        }
      }
    }
  ]
}

Thank you so much for quick response, really appreciate your efforts. :pray:

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.