Creating an enrich policy from collapsed data

Hi Elastic community

I have an index containing many documents per host.id describing the currently running software. This information might change over time, due to software updates.

I can use a collapse query to get the latest entry of each software info from the index, like this:

GET /filebeat-*/_search
{
  "query": {
    "term": {
      "json.logtype": {
        "value": "software"
      }
    }
  },
  "collapse": {
    "field": "host.id",
    "inner_hits": {
      "name": "most_recent",
      "size": 1,
      "sort": [{"@timestamp": "desc"}]
    }
  },
  "_source": false
}

so I will find the info I need inside inner_hits:

    "hits" : [
      {
        "_index" : "filebeat-2022.09.11",
        "_type" : "_doc",
        "_id" : "IhjwLoMBm1GMkVa4CZZ-",
        "_score" : 9.112949,
        "fields" : {
          "host.id" : [
            "8f3a09ac069a4ba885e860a011f8570d"
          ]
        },
        "inner_hits" : {
          "most_recent" : {
            "hits" : {
              "total" : {
                "value" : 215,
                "relation" : "eq"
              },
              "max_score" : null,
              "hits" : [
                {
                  "_index" : "filebeat-2022.11",
                  "_type" : "_doc",
                  "_id" : "FDg4YIQBhix3UBkMF9Y-",
                  "_score" : null,
                  "_source" : {
                    "@timestamp" : "2022-11-10T06:26:27.423Z",
                    "host": {
                      "id" : "8f3a09ac069a4ba885e860a011f8570d",
                      ...
                    },
                    "json" : {
                      "logtype" : "software",
                      "software" : {
                        "build" : "2021-12-07 10:11:51",
                        "version" : "7.5.0 Build 199",
                        ...
                      }
                ...

Now I want to create an enrich index from exactly the data inside inner_hits to add the software version to other incoming data from these systems. I can setup the enrich policy and generate the index like this:

PUT /_enrich/policy/enrich-filebeat-software
{
  "match": {
    "indices": [
      "filebeat-*"
    ],
    "match_field": "host.id",
    "enrich_fields": [
      "json.software"
    ],
    "query": {
      "terms": {
        "json.logtype": {
          "value": "software"
        }
      }
    }
  }
}

But this contains all documents for the host.id, not the latest, making the enrich index overly large and taking a long time to execute.

How could I generate an enrich index from the inner_hits of my initial search? Do I need to create an intermediate index from which I generate the enrich index?

Any pointers are greatly appreciated.

Best regards,
Christian

Hi community,

I figured out how to do this, I'll post my findings here so they can be reused if needed...

My watcher looks like this:

PUT _watcher/watch/watch-filebeat-software
{
  "trigger": {
    "schedule": { "interval": "24h" }
  },
  "input": {
    "search": {
      "request": {
        "indices": ["filebeat-*"],
        "body": {
          "query": {
            "term": {
              "json.logtype": {
                "value": "software"
              }
            }
          },
          "size": 10000,
          "collapse": {
            "field": "host.id",
              "inner_hits": {
                "name": "most_recent",
                "size": 1,
                "sort": [{"@timestamp": "desc"}],
                "_source": ["host.id", "json.software", "@timestamp"]
              }
            },
            "_source": false
        }
      }
    }
  },
  "actions": {
    "index_payload": {
      "transform": {
        "script": """
          def documents = ctx.payload.hits.hits.stream()
            .map(hit -> [
              "_index": "watch-filebeat-software",
              "_id": hit.inner_hits.most_recent.hits.hits.0._source.host.id,
              "host": hit.inner_hits.most_recent.hits.hits.0._source.host,
              "json": hit.inner_hits.most_recent.hits.hits.0._source.json,
              "@timestamp": hit.inner_hits.most_recent.hits.hits.0._source['@timestamp']
            ])
            .collect(Collectors.toList());
          return [ "_doc": documents ];
        """
      },
      "index": {}
    },
    "enrich": {
      "webhook": {
        "method": "PUT",
        "scheme": "https",
        "host": "********.elastic-cloud.com",
        "port": 9243,
        "path": "/_enrich/policy/enrich-filebeat-software/_execute",
        "headers": {
          "Authorization": "ApiKey *****************************************************=="
        }
      }
    }
  }
}

The enrich policy is configured like this:

PUT /_enrich/policy/enrich-filebeat-software
{
  "match": {
    "indices": [
      "watch-filebeat-software"
    ],
    "match_field": "host.id",
    "enrich_fields": [
      "json"
    ],
    "query": { "match_all": {} }
  }
}

With this I have an index watch-filebeat-software which is small (one entry per host.id) and is updated by the watcher once a day. At the same time the enrich policy is executed, so the enrich index is kept up to date.

Best regards,
Christian

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.