Elastic Ingest Pipeline with enrich processor to enrich nested objects

What is the correct syntax for the enrich processor to access field for enrichment for an object within an array?

This works to get access to field in single object, as shown in the example provided. This is what I am starting from.

POLICY DEFINITION:

PUT /_enrich/policy/country_enrichment_v1
{ 
  "match": { 
    "indices": [ 
      "country_enrichment_test"
    ], 
    "match_field": "countryName",
    "enrich_fields": [
      "continent", 
      "region_un", 
      "subregion", 
      "region_wb"
    ] 
  } 
}

EXECUTE POLICY and
DEFINE INGEST PIPELINE WITH ENRICH PROCESSOR

PUT _ingest/pipeline/enrich_country_data_v0
{ 
  "processors": [ 
    { 
      "enrich": {
        "policy_name": "country_enrichment_v1", 
        "field": "nationOfResidence.name", 
        "target_field": "nationOfResidenceEnriched" 
      } 
    }
  ] 
}

INSERT RECORD:

PUT /reports_enriched_v0/_doc/2?pipeline=enrich_country_data_v0
{
  "nationOfResidence": {
    "name": "Singapore"
  }
}

*** Note actual object has additional fields ***

RESULT:

{
  "_index" : "reports_enriched_v0",
  "_type" : "_doc",
  "_id" : "2",
  "_version" : 1,
  "_seq_no" : 1,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "nationOfResidence" : {
      "name" : "Singapore"
    },
    "nationOfResidenceEnriched" : {
      "continent" : "Asia",
      "subregion" : "Eastern Asia",
      "region_wb" : "Eastern Asia",
      "region_un" : "Asia",
      "countryName" : "Singapore"
    }
  }
}

==============================================================
I want to do the same for same object field that is within an array.
I assume I want to wrap the Enrich processor within a Foreach processor?

If so , what is the correct syntax to permit the enrich processor to access the match field in the object?

DEFINE INGEST PIPELINE WITH ENRICH PROCESSOR

PUT _ingest/pipeline/enrich_country_data_v1 
{ 
  "processors": [ 
    { 
      "enrich": {
        "policy_name": "country_enrichment_v1", 
        "field": "nationOfResidence.name", 
        "target_field": "nationOfResidenceEnriched" 
      } 
    },
    { 
      "foreach": {
        "field": "nationsTravelled",
        "processor": {
          "enrich": {
            "policy_name": "country_enrichment_v1", 
            "field": "name",   <<== WHAT SHOULD GO HERE?  I have tried a number of things but no success yet
            "target_field": "nationsTravelledEnriched" 
          } 
        }
      }
    }
  ] 
}

INSERT RECORD WITH LIST OBJECTS:

PUT /reports_enriched_v0/_doc/2?pipeline=enrich_country_data_v1
{
  "nationOfResidence": {
    "name": "Singapore"
  },
"nationsTravelled": [
  {
    "name": "Japan",
     ...
  },
  {
    "name": "Germany",
    ....
  }
]
}

*** NOTE: Actual objects have additional fields defined ***

Recommendations? Thank you.

try _ingest._value.name as mentioned here

1 Like

Thanks - I did find the same solution for accessing a field in an object. Used foreach processor to walk list of objects and then _ingest.value.name to access a field.

Current Problem

Bigger issue that I have not found solution for is how to reach into an array within an array and get get an attribute for an object.

Below

SubPipeline Definition

This sub pipeline is called from the main pipeline defined in the simulation below. What I want to reach in _source is the list of namedItems.country.name

Using the foreach processor in the main pipeline on the namedItems array and sending to subpipeline, seems to get me access to array of entries in namedItems.country.

What I can not seem to find the correct way to reach is the name attribute for each namedItems.country

IDEAS?

PUT _ingest/pipeline/namedItemsSubPipeline 
{
  "processors": [
    {
      "append": {
        "field": "namedItemsCountries",
        "value": ["{{_ingest._value}}"]
      }
    },
    {
      "foreach": {
        "field": "namedItemsCountries",
        "processor": {
          "append": {
            "field": "namedItemsCountryNames",
            "value": ["{{_ingest._value.name}}"]
          }
        }
      }
    },
    { 
      "enrich": {
        "policy_name": "countries_with_enrichment", 
        "field": "namedItemsCountryNames", 
        "target_field": "namedItemsCountriesEnriched",
        "max_matches": 10
      } 
    }
  ]
}

Simulate Main Pipeline

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "processors": [ 
      { 
        "enrich": {
          "policy_name": "countries_with_enrichment", 
          "field": "nationOfResidence.name", 
          "target_field": "nationOfResidenceEnriched" 
        } 
      },
      {
        "foreach": {
          "field": "nationsTravelled",
          "processor": {
            "append": {
              "field": "nationsTravelledCountryNames",
              "value": ["{{_ingest._value.name}}"]
            }
          }
        }
      },
      { 
        "enrich": {
          "policy_name": "countries_with_enrichment", 
          "field": "nationsTravelledCountryNames", 
          "target_field": "nationsTravelledEnriched",
          "max_matches": 10
        } 
      },
      {
        "foreach": {
          "field": "namedItems",
          "processor": {
            "pipeline": {
              "name": "namedItemsSubPipeline"
            }
          }
        }
      }
    ] 
  },
  "docs": [
    {
      "_source": {
        "nationOfResidence": {
          "name": "Canada",
          "shortCode": "CA"
        },
        "nationsTravelled": [
          {
            "name": "United States of America",
            "shortCode": "US"
          },
          {
            "name": "Great Britain",
            "shortCode": "GB"
          }
        ],
        "namedItems": [
          {
            "itemName": "Item 1",
            "country": {
              "name": "Mexico",
              "shortCode": "MX"
            }
          },
          {
            "itemName": "Item 2",
            "country": {
              "name": "Australia",
              "shortCode": "AU"
            }
          }
        ]
      }
    }
  ]
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.