Enrich pipeline - how to get sum of enriched values from array

Hi!
I'm trying to apply "enrich pipeline" to my data to be able to filter/sort my main index with new fields. These fields are originally stored in a separate index.

Let's say I have two indices:
index1 - main index where main documents are stored (recipient index);
index2 - additional index with fields I want to copy/enrich into index1 (donor index);

Document from index1 may have one-to-many relation to index2

Example doc from index1 :

PUT /index1/_doc/111
{
  "id": "111",
  "foo": "bar"
}

Example docs from index2:

PUT /index2/_doc/child-111
{
  "id": "child-111",
  "views": 4,
  "parents": [
    {
      "parent_id": "111",
      "foo": "bar"
    },
    {
      "parent_id": "333",
      "foo": "bar"
    }
    ]
}

PUT /index2/_doc/child-222
{
  "id": "child-222",
  "views": 10,
  "parents": [
    {
      "parent_id": "111",
      "foo": "bar"
    },
    {
      "parent_id": "333",
      "foo": "bar"
    }
    ]
}

I want to match documents from index2 by value inside array of objects:parents.parent_id with value of id form index1. And write sum of matched views into main doc from index1.

My enrich policy:

GET /_enrich/policy/test-enrich-summ
{
  "policies": [
    {
      "config": {
        "match": {
          "name": "test-enrich-summ",
          "indices": [
            "index2"
          ],
          "match_field": "parents.parent_id",
          "enrich_fields": [
            "views"
          ]
        }
      }
    }
  ]
}

My ingest pipeline:

[
  {
    "enrich": {
      "field": "id",
      "policy_name": "test-enrich-summ",
      "target_field": "enrich_data",
      "max_matches": 121,
      "ignore_missing": true
    }
  }
]

After executing enrich policy I do update docs from index1 by:

POST /index1/_update_by_query?pipeline=test-summ-pipeline

Resulting doc from index:

GET /index1/_doc/111
{
  "_index": "index1",
  "_id": "111",
  "_version": 19,
  "_seq_no": 18,
  "_primary_term": 1,
  "found": true,
  "_source": {
    "enrich_data": [
      {
        "views": 10,
        "parents": [
          {
            "parent_id": "111"
          },
          {
            "parent_id": "333"
          }
        ]
      },
      {
        "views": 4,
        "parents": [
          {
            "parent_id": "111"
          },
          {
            "parent_id": "222"
          }
        ]
      }
    ],
    "foo": "bar",
    "id": "111"
  }
}

It correctly saved combined matched data into enrich_data field. But I don't need array of objects there, parents and separate views fileds are not needed (perhaps only for debug). I want to get a sum of views, like some enrich_data.total_views field. How can accomplish this?

Answering my own question: you can combine multiple processors into a single pipeline.

I added 2 addition processors:

  1. Script processor to sum views into total_views
  2. Remove enrich_data field from main doc.

My final pipeline processors:

[
  {
    "enrich": {
      "field": "id",
      "policy_name": "test-enrich-summ",
      "target_field": "enrich_data",
      "max_matches": 121,
      "ignore_missing": true
    }
  },
  {
    "script": {
      "source": "int total_views = 0;\nfor (item in ctx['enrich_data']) {\n  total_views += item['views'];\n  }\nctx['total_views'] = total_views;"
    }
  },
  {
    "remove": {
      "field": "enrich_data",
      "ignore_missing": true,
      "ignore_failure": true
    }
  }
]

P.S.
If there is more "elastic" way to accomplish task - please let me know)

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.