How to dynamically update documents in ingestnode pipeline?

So, I have a list of some String values which are essentially document ids and some other field value which I would like to update.
I am trying to use an ingest node pipeline as I failed to achieve the same through logstash (that data are getting prepared in logstash filter and need to update using loop in output plugin).

Below is the sample data.

POST _ingest/pipeline/my_index/_simulate
{
  "docs": [
    {
      "_source": {
        "brand_id": "hello",
        "brand_name": "HI",
        "models": [
          {
            "model_number": "Test1",
            "model_id": 123
          },
          {
            "model_number": "Test2",
            "model_id": 456
          }
        ]
      }
    }
  ]
}

The structure of model index is simple-

model_id
model_number
brand  {
   brand_id,
   brand_number
}

Similarly brand also has same kind of mapping, except that model is an array there. Now, whenever a brand gets updated or inserted through logstash elasticsearch filter, I am querying model index that which are the models belong to that brand and preparing a list/array of model_id.
My ultimate goal is to update the brand index as well as the model index with the updated or newly inserted brand details for all the related models.
So, when I am reaching logstash output plugin (after aggregate filter) I have the brand data ready as well as the array of models to be updated. Since I am not able to update the models in loop within logstash output, I am trying to process it through ingest pipeline.

So, could you please help me how it should be done? Is there a better way to do this (many-to-many update)? Is it possible to do in logstash or ingest node is the better option.

Below is what I am trying to accomplish-

PUT _ingest/pipeline/my_index
{
    "description": "use index:my_index and type:_doc",
    "processors": [
      {
        "foreach": {
          "field": "models",
          "processor": {
            "script": {
              "source": """
                ctx._index = 'my_index';
                ctx._type = '_doc';
                ctx._id = '%{_ingest._value.model_id}'     #fails to find the model_id,
                <Want to update the brand data here>
              """
            }
          }
        }
      }
    ]
}

Thanks in advance and sorry for my long post!

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.