Using transform with enrich pipeline to enrich ip with latest vulnerabilities

Hi,

I want to enrich incoming event that contains local source.ip/destination.ip with vulnerability field with this logic:
match source.ip with server.ip, get highest vulnerability.severity_num from latest vulnerability.report_date.

I created index with vulnerabilies, each document contains server.ip (IP), vulnerability.report_date (date), vulnerability.severity_num (number), vulnerability.scan_date(date). I have a script that pulls out report data from vulnerability scanner, sends it to logstash to parse it and store in vulnerabilities index, on daily basis. Field vulnerability.scan_date as event date (@timestamp).

Now, I tried to enrich incoming events with logstash. It works, but it's slow - for 1k eps it has to do 2k queries to elasticsearch and events in logstash become lagging.

So I decided to move this logic to elasticsearch's enrich pipeline. Problem with this that enrich processor cannot perform aggregated query, only direct match. The cure for this to create another index with server.ip as key with latest highest vulnerability.

I found transforms, which could transform my vulnerabilities index to latest_vulnerabilities index, but here I'm stucked. I tried pivot transform with this logic:

  1. group by: server.ip
    aggregations: vulnerabilities.report_date, vulnerabilities.severity_num
  • this gives me one document per IP, latest report is correct, but highest severity is taken from all reports, not only from latest report
  1. group by: server.ip, vulnerabilities.report_date
    aggregations: vulnerabilities.severity_num
  • this gives me one document per report_date with correct highest severity, but there are multiple documents with same server.ip and I'm not sure which doc will be taken by enrich processor.
  1. sub-aggregation:
{
  "source": {
    "index": "vulnerabilities",
    "query": {
      "bool": {
        "should": [
          {
            "exists": {
              "field": "vulnerability.severity_num"
            }
          }
        ],
        "minimum_should_match": 1
      }
    }
  },
  "pivot": {
    "group_by": {
      "server_ip": {
        "terms": {
          "field": "server.ip",
          "missing_bucket": true
        }
      }
    },
    "aggregations": {
      "max_report": {
        "max": {
          "field": "vulnerability.report_date"
        },
        "aggs": {
          "max_vuln": {
            "max": {
              "field": "vulnerability.severity_num"
            }
          }
        }
      }
    }
  },
  "description": "Latest vulnerability per ip",
  "dest": {
    "index": "vulnerabilities_latest"
  },
  "frequency": "5m",
  "sync": {
    "time": {
      "field": "vulnerability.scan_date",
      "delay": "60s"
    }
  },
  "retention_policy": {
    "time": {
      "field": "vulnerability.scan_date",
      "max_age": "30d"
    }
  }
}
  • but this does not work
    "reason" : "Aggregator [max_report] of type [max] cannot accept sub-aggregations"

How can I achive my goal?

Thank you for help.

To get the severity from the latest report have a look at top_metrics. You get the latest by specifying sort on a timestamp field with order desc.

Hope that helps.

Hi Hendrik,

thank you for your answer. Unfortunately I can not use "latest" because I'm not looking for latest vulnerability of some server.ip, but highest vulnerability from latest report of some server.ip. I can't select numeric field such as severity_num when using "latest".

I tried to use pivot, with top_metrics with this dataset:
|server.ip|vulnerability.report_date|vulnerability.severity_num|
|10.1.2.3|Dec 5, 2022 @ 16:00:14.000|5|
|10.1.2.3|Dec 5, 2022 @ 16:00:14.000|5.3|
|10.1.2.3|Dec 5, 2022 @ 16:00:14.000|7.5|
|10.1.2.3|Dec 5, 2022 @ 16:00:14.000|2.4|
|10.1.2.3|Dec 5, 2022 @ 16:00:14.000|1.3|

and this transform:

{
  "group_by": {
    "server.ip": {
      "terms": {
        "field": "server.ip"
      }
    }
  },
  "aggregations": {
    "top_metrics": {
      "top_metrics": {
        "metrics": [
          {
            "field": "vulnerability.severity_num"
          }
        ],
        "sort": {
          "vulnerability.report_date": "desc"
        }
      }
    }
  }
}

but it returns 5 instead of 7.5. I also tried to add sort for vulnerability.severity_num, but result was the same:

{
  "group_by": {
    "server.ip": {
      "terms": {
        "field": "server.ip"
      }
    }
  },
  "aggregations": {
    "top_metrics": {
      "top_metrics": {
        "metrics": [
          {
            "field": "vulnerability.severity_num"
          }
        ],
        "sort": {
          "vulnerability.report_date": "desc",
          "vulnerability.severity_num": "desc"
        }
      }
    }
  }
}

Why sort doesn't take another field (vulnerability.severity_num) into account?

Thank you for help.

Regards, Sigi

Top metrics doesn't support more than 1 sort criteria. I wonder why it even lets you specify 2. I will check and report an issue in case.

I see 2 options, a scripted_metric that implements your own version of top metric but with secondary sort or a runtime field that combines the report date and the severity.

However, 1st I like to understand:

Are the reports coming in regularly and all with the same date? Are those daily reports?

Hi Hendrik,

I see 2 options, a scripted_metric that implements your own version of top metric but with secondary sort or a runtime field that combines the report date and the severity.

Ok, I'm going take deep dive on that.

Are the reports coming in regularly and all with the same date? Are those daily reports?

We can assume daily reports. In dataset there is only one report of one IP - this is more general dataset:

server.ip	vulnerability.report_date	vulnerability.severity_num
10.1.2.3	Dec 5, 2022 @ 16:00:14.000	5
10.1.2.3	Dec 5, 2022 @ 16:00:14.000	5.3
10.1.2.3	Dec 5, 2022 @ 16:00:14.000	7.5
10.1.2.3	Dec 5, 2022 @ 16:00:14.000	2.4
10.1.2.3	Dec 5, 2022 @ 16:00:14.000	1.3
10.2.5.2	Dec 5, 2022 @ 16:00:14.000	2.5
10.2.5.2	Dec 5, 2022 @ 16:00:14.000	6.3
10.2.5.2	Dec 5, 2022 @ 16:00:14.000	4.2
10.2.3.2	Dec 5, 2022 @ 16:00:14.000	4.2
172.30.10.2	Dec 5, 2022 @ 16:00:14.000	10
172.30.10.2	Dec 5, 2022 @ 16:00:14.000	7.4
172.30.10.2	Dec 5, 2022 @ 16:00:14.000	1.4
172.30.10.2	Dec 5, 2022 @ 16:00:14.000	7.5
172.30.10.2	Dec 5, 2022 @ 16:00:14.000	6.5
10.1.2.3	Dec 4, 2022 @ 16:00:14.000	5
10.1.2.3	Dec 4, 2022 @ 16:00:14.000	5.3
10.1.2.3	Dec 4, 2022 @ 16:00:14.000	7.5
10.1.2.3	Dec 4, 2022 @ 16:00:14.000	2.4
10.1.2.3	Dec 4, 2022 @ 16:00:14.000	1.3
10.2.5.2	Dec 4, 2022 @ 16:00:14.000	2.5
10.2.5.2	Dec 4, 2022 @ 16:00:14.000	6.3
10.2.5.2	Dec 4, 2022 @ 16:00:14.000	4.2
10.2.3.2	Dec 4, 2022 @ 16:00:14.000	4.2
172.30.10.2	Dec 4, 2022 @ 16:00:14.000	10
172.30.10.2	Dec 4, 2022 @ 16:00:14.000	7.4
172.30.10.2	Dec 4, 2022 @ 16:00:14.000	1.4
172.30.10.2	Dec 4, 2022 @ 16:00:14.000	7.5
172.30.10.2	Dec 4, 2022 @ 16:00:14.000	6.5

There 2 reports (4.12 and 5.12), each report consist of multiple IP's.
Transform of this dataset should produce following:

server.ip	vulnerability.report_date	vulnerability.severity_num
10.1.2.3	Dec 5, 2022 @ 16:00:14.000	7.5
10.2.5.2	Dec 5, 2022 @ 16:00:14.000	6.3		
10.2.3.2	Dec 5, 2022 @ 16:00:14.000	4.2
172.30.10.2	Dec 5, 2022 @ 16:00:14.000	10

Thank you.

The transform docs contain painless examples that should help you to get started.

I suggest to consider a date_histogram in group_by with a calendar interval of 1 day. The top_metrics can than sort on severity only. That way your transform destination isn't the latest severity anymore, but daily ones. Maybe that's even useful for you.

To reduce storage you could age out old data by using a retention_policy. A retention_policy deletes documents when they are older than the specified time range.

In terms of performance a transform with a date_histogram performs much better, because it doesn't have to look back and re-load all historic data points. The current one in contrast re-iterates through all data points of a server ip on every run. With date_histogram + terms the transform will only aggregate on data from max 2 days in the past.

Hi Hendrik,

I found easier way - I created 2 transforms:

  1. Pivot transform from vulnerabilities index group by server.ip, vulnerability.report_date, aggregate max of vulnerability.severity_num, output to new index max_ip_vulnerabilities_by_latest_report_date.

  2. Latest transform from max_ip_vulnerabilities_by_latest_report_date index, unique keys server.ip, sort by vulnerability.report_date -> output to new index latest_vulnerabilities -> this index contains last highest vulnerability for each ip and each ip is in index only once so it can be used as key.

Now I can create enrich with latest_vulnerabilities index.

Thank you for help.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.