Enrich processor: enrich multiple documents into one array

I have two indices. index1 and index2.

Let say each document in index1 owes fields

[ "host.name", "ip"]

And each document in index2 owes fields

[ "host.name", "software"]

index2 can contain multiple documents for different softwares with the same host.name field.

What I want to archive is one unified document containing host.name, ip and an array of software

I had set up an enrichment policy + processor in pipeline, executed it and index a document via the pipeline. What I archived is document containing host.name ip and some random software from index2.

Here is my enrich policy setup:

PUT _enrich/policy/host-soft
{
     "match": {
          "indices" : ["index2"],
          "match_field": "host.name",
          "enrich_fields": [ "software" ]
     }
}
 POST _enrich/policy/host-soft/_execute
PUT _ingest/pipeline/host-soft
{
    "description": "enrich data from index2",
    "processors": [
         {
               "enrich": {
                    "policy_name": "host-soft",
                    "field": "host.name,
                    "target_field": "softwares"
               }
          }
     ]
}
 PUT /index1/_doc/test1?pipeline=host-soft
{
     "host": {
          "name": "host1"
     },
     "ip": "127.0.0.1"
}
 GET /index1/_doc/test1

result

{ 
     "host": {
          "name": "host1"
     },
     "ip": "127.0.0.1",
     "softwares": {
          "software": "soft3",
     }
}

Desired document

 { 
     "host": {
          "name": "host1"
     },
     "ip": "127.0.0.1",
     "softwares": [ "soft1", "soft2", "soft3", "soft4", "soft5" ]
}

Hello,

I would suggest using Transform Jobs.

Some comments:

  • consider this solution might be sub-optimal
  • it doesn't act at indexing time, but after you send data to index1 (the hosts & ip) and index2 (the softwares), creating a third index with the merged data; the job can run continuously
  • each host will be enriched with new data coming in, it has no concept of time
  • it could be improved using an HashSet instead of a List to accumulate the values, but there's a bug which doesn't allow to do so: https://github.com/elastic/elasticsearch/issues/54708
  • we could avoid the scripted metric aggregation ip if we enrich the index2 using the data from index1 (the opposite of what you've done), and then we run the Transform job grouping by host.name.keyword and then ip.keyword

Demo data

DELETE index1,index2
POST index1/_doc/1
{
  "host": { "name": "host1" },
  "ip": "ip1"
}
POST index1/_doc/2
{
  "host": { "name": "host2" },
  "ip": "ip2"
}
POST index1/_doc/3
{
  "host": { "name": "host3" },
  "ip": "ip3"
}
POST index2/_doc/1
{
  "host": { "name": "host1" },
  "software": "sw1"
}
POST index2/_doc/11
{
  "host": { "name": "host1" },
  "software": "sw2"
}
POST index2/_doc/111
{
  "host": { "name": "host1" },
  "software": "sw3"
}
POST index2/_doc/2
{
  "host": { "name": "host2" },
  "software": "sw2"
}
POST index2/_doc/3
{
  "host": { "name": "host3" },
  "software": "sw3"
}

Transform Job

PUT _transform/merge
{
  "source": {
    "index": [
      "index1",
      "index2"
    ]
  },
  "pivot": {
    "group_by": {
      "host.name.keyword": {
        "terms": {
          "field": "host.name.keyword"
        }
      }
    },
    "aggregations": {
      "softwares": {
        "scripted_metric": {
          "init_script": "state.sw = [];",
          "map_script": "if(doc.containsKey('software.keyword')) { state.sw.add(doc['software.keyword'].value) }",
          "combine_script": "def merged = []; for (s in state.sw) { merged.add(s) } return merged",
          "reduce_script": "def merged = new HashSet(); for (s in states) { merged.addAll(s) } return merged.asList()"
        }
      },
      "ip": {
        "scripted_metric": {
          "init_script": "state.ip = [];",
          "map_script": "if(doc.containsKey('ip.keyword')) { state.ip.add(doc['ip.keyword'].value) }",
          "combine_script": "def merged = []; for (s in state.ip) { merged.add(s) } return merged",
          "reduce_script": "def merged = new HashSet(); for (s in states) { merged.addAll(s) } return merged.asList()"
        }
      }
    }
  },
  "dest": {
    "index": "index3"
  }
}

Result

{
  "took" : 0,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 3,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "index3",
        "_type" : "_doc",
        "_id" : "aD5SPQr7NAeQCBRuOWuMb2wAAAAAAAAA",
        "_score" : 1.0,
        "_source" : {
          "softwares" : [
            "sw1",
            "sw3",
            "sw2"
          ],
          "ip" : [
            "ip1"
          ],
          "host" : {
            "name" : {
              "keyword" : "host1"
            }
          }
        }
      },
      {
        "_index" : "index3",
        "_type" : "_doc",
        "_id" : "aHet-JFbw3tQznvEQGYEVl4AAAAAAAAA",
        "_score" : 1.0,
        "_source" : {
          "softwares" : [
            "sw2"
          ],
          "ip" : [
            "ip2"
          ],
          "host" : {
            "name" : {
              "keyword" : "host2"
            }
          }
        }
      },
      {
        "_index" : "index3",
        "_type" : "_doc",
        "_id" : "aOKO0hBR3Wx-CcSI66_2d8kAAAAAAAAA",
        "_score" : 1.0,
        "_source" : {
          "softwares" : [
            "sw3"
          ],
          "ip" : [
            "ip3"
          ],
          "host" : {
            "name" : {
              "keyword" : "host3"
            }
          }
        }
      }
    ]
  }
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.