Creating an enrich policy from collapsed data

Hi Elastic community

I have an index containing many documents per host.id describing the currently running software. This information might change over time, due to software updates.

I can use a collapse query to get the latest entry of each software info from the index, like this:

GET /filebeat-*/_search
{
  "query": {
    "term": {
      "json.logtype": {
        "value": "software"
      }
    }
  },
  "collapse": {
    "field": "host.id",
    "inner_hits": {
      "name": "most_recent",
      "size": 1,
      "sort": [{"@timestamp": "desc"}]
    }
  },
  "_source": false
}

so I will find the info I need inside inner_hits:

    "hits" : [
      {
        "_index" : "filebeat-2022.09.11",
        "_type" : "_doc",
        "_id" : "IhjwLoMBm1GMkVa4CZZ-",
        "_score" : 9.112949,
        "fields" : {
          "host.id" : [
            "8f3a09ac069a4ba885e860a011f8570d"
          ]
        },
        "inner_hits" : {
          "most_recent" : {
            "hits" : {
              "total" : {
                "value" : 215,
                "relation" : "eq"
              },
              "max_score" : null,
              "hits" : [
                {
                  "_index" : "filebeat-2022.11",
                  "_type" : "_doc",
                  "_id" : "FDg4YIQBhix3UBkMF9Y-",
                  "_score" : null,
                  "_source" : {
                    "@timestamp" : "2022-11-10T06:26:27.423Z",
                    "host": {
                      "id" : "8f3a09ac069a4ba885e860a011f8570d",
                      ...
                    },
                    "json" : {
                      "logtype" : "software",
                      "software" : {
                        "build" : "2021-12-07 10:11:51",
                        "version" : "7.5.0 Build 199",
                        ...
                      }
                ...

Now I want to create an enrich index from exactly the data inside inner_hits to add the software version to other incoming data from these systems. I can setup the enrich policy and generate the index like this:

PUT /_enrich/policy/enrich-filebeat-software
{
  "match": {
    "indices": [
      "filebeat-*"
    ],
    "match_field": "host.id",
    "enrich_fields": [
      "json.software"
    ],
    "query": {
      "terms": {
        "json.logtype": {
          "value": "software"
        }
      }
    }
  }
}

But this contains all documents for the host.id, not the latest, making the enrich index overly large and taking a long time to execute.

How could I generate an enrich index from the inner_hits of my initial search? Do I need to create an intermediate index from which I generate the enrich index?

Any pointers are greatly appreciated.

Best regards,
Christian

Hi community,

I figured out how to do this, I'll post my findings here so they can be reused if needed...

My watcher looks like this:

PUT _watcher/watch/watch-filebeat-software
{
  "trigger": {
    "schedule": { "interval": "24h" }
  },
  "input": {
    "search": {
      "request": {
        "indices": ["filebeat-*"],
        "body": {
          "query": {
            "term": {
              "json.logtype": {
                "value": "software"
              }
            }
          },
          "size": 10000,
          "collapse": {
            "field": "host.id",
              "inner_hits": {
                "name": "most_recent",
                "size": 1,
                "sort": [{"@timestamp": "desc"}],
                "_source": ["host.id", "json.software", "@timestamp"]
              }
            },
            "_source": false
        }
      }
    }
  },
  "actions": {
    "index_payload": {
      "transform": {
        "script": """
          def documents = ctx.payload.hits.hits.stream()
            .map(hit -> [
              "_index": "watch-filebeat-software",
              "_id": hit.inner_hits.most_recent.hits.hits.0._source.host.id,
              "host": hit.inner_hits.most_recent.hits.hits.0._source.host,
              "json": hit.inner_hits.most_recent.hits.hits.0._source.json,
              "@timestamp": hit.inner_hits.most_recent.hits.hits.0._source['@timestamp']
            ])
            .collect(Collectors.toList());
          return [ "_doc": documents ];
        """
      },
      "index": {}
    },
    "enrich": {
      "webhook": {
        "method": "PUT",
        "scheme": "https",
        "host": "********.elastic-cloud.com",
        "port": 9243,
        "path": "/_enrich/policy/enrich-filebeat-software/_execute",
        "headers": {
          "Authorization": "ApiKey *****************************************************=="
        }
      }
    }
  }
}

The enrich policy is configured like this:

PUT /_enrich/policy/enrich-filebeat-software
{
  "match": {
    "indices": [
      "watch-filebeat-software"
    ],
    "match_field": "host.id",
    "enrich_fields": [
      "json"
    ],
    "query": { "match_all": {} }
  }
}

With this I have an index watch-filebeat-software which is small (one entry per host.id) and is updated by the watcher once a day. At the same time the enrich policy is executed, so the enrich index is kept up to date.

Best regards,
Christian