Hi,
I want to enrich incoming event that contains local source.ip/destination.ip with vulnerability field with this logic:
match source.ip with server.ip, get highest vulnerability.severity_num from latest vulnerability.report_date.
I created index with vulnerabilies, each document contains server.ip (IP), vulnerability.report_date (date), vulnerability.severity_num (number), vulnerability.scan_date(date). I have a script that pulls out report data from vulnerability scanner, sends it to logstash to parse it and store in vulnerabilities index, on daily basis. Field vulnerability.scan_date as event date (@timestamp).
Now, I tried to enrich incoming events with logstash. It works, but it's slow - for 1k eps it has to do 2k queries to elasticsearch and events in logstash become lagging.
So I decided to move this logic to elasticsearch's enrich pipeline. Problem with this that enrich processor cannot perform aggregated query, only direct match. The cure for this to create another index with server.ip as key with latest highest vulnerability.
I found transforms, which could transform my vulnerabilities index to latest_vulnerabilities index, but here I'm stucked. I tried pivot transform with this logic:
- group by: server.ip
aggregations: vulnerabilities.report_date, vulnerabilities.severity_num
- this gives me one document per IP, latest report is correct, but highest severity is taken from all reports, not only from latest report
- group by: server.ip, vulnerabilities.report_date
aggregations: vulnerabilities.severity_num
- this gives me one document per report_date with correct highest severity, but there are multiple documents with same server.ip and I'm not sure which doc will be taken by enrich processor.
- sub-aggregation:
{
"source": {
"index": "vulnerabilities",
"query": {
"bool": {
"should": [
{
"exists": {
"field": "vulnerability.severity_num"
}
}
],
"minimum_should_match": 1
}
}
},
"pivot": {
"group_by": {
"server_ip": {
"terms": {
"field": "server.ip",
"missing_bucket": true
}
}
},
"aggregations": {
"max_report": {
"max": {
"field": "vulnerability.report_date"
},
"aggs": {
"max_vuln": {
"max": {
"field": "vulnerability.severity_num"
}
}
}
}
}
},
"description": "Latest vulnerability per ip",
"dest": {
"index": "vulnerabilities_latest"
},
"frequency": "5m",
"sync": {
"time": {
"field": "vulnerability.scan_date",
"delay": "60s"
}
},
"retention_policy": {
"time": {
"field": "vulnerability.scan_date",
"max_age": "30d"
}
}
}
- but this does not work
"reason" : "Aggregator [max_report] of type [max] cannot accept sub-aggregations"
How can I achive my goal?
Thank you for help.