EQL - Network Port scan - Watcher to EQL

Please help me to convert the below port scan watcher query to EQL in ELK SIEM 7.12.1.

PUT _watcher/watch/port_scan_watch
{
"trigger": {
"schedule": {
"interval": "10s"
}
},
"input": {
"search": {
"request": {
"search_type": "query_then_fetch",
"indices": [
"logstash-tcpdump-*"
],
"types": [
"tcpdump"
],
"body": {
"size": 0,
"query": {
"bool": {
"must": [
{
"match": {
"tags": "tcp_connection_started"
}
},
{
"range": {
"@timestamp": {
"gte": "now-30s"
}
}
}
]
}
},
"aggs": {
"by_src_ip": {
"terms": {
"field": "src_ip"
},
"aggs": {
"by_target_ip": {
"terms": {
"field": "dst_ip",
"order": {
"unique_port_count": "desc"
}
},
"aggs": {
"unique_port_count": {
"cardinality": {
"field": "dst_port"
}
}
}
}
}
}
}
}
}
}
},
"condition": {
"script": {
"inline": "for (int i = 0; i < ctx.payload.aggregations.by_src_ip.buckets.size(); i++) {for (int j = 0; j < ctx.payload.aggregations.by_src_ip.buckets[i].by_target_ip.buckets.size(); j++) {if (ctx.payload.aggregations.by_src_ip.buckets[i].by_target_ip.buckets[j].unique_port_count.value > threshold) return true;};};return false;",
"params": {
"threshold": 50
}
}
},
"throttle_period": "30s",
"actions": {
"email_administrator": {
"transform": {
"script": {
"inline": "def target='';def attacker='';def body='';for (int i = 0; i < ctx.payload.aggregations.by_src_ip.buckets.size(); i++) {for (int j = 0; j < ctx.payload.aggregations.by_src_ip.buckets[i].by_target_ip.buckets.size(); j++) {if (ctx.payload.aggregations.by_src_ip.buckets[i].by_target_ip.buckets[j].unique_port_count.value > threshold) {target=ctx.payload.aggregations.by_src_ip.buckets[i].by_target_ip.buckets[j].key;attacker=ctx.payload.aggregations.by_src_ip.buckets[i].key;body='Detected portscan from ['+attacker+'] to ['+target+']. '+ctx.payload.aggregations.by_src_ip.buckets[i].by_target_ip.buckets[j].unique_port_count.value+ ' unique ports scanned.'; return [ body : body ];};};};",
"params": {
"threshold": 50
}
}
},
"email": {
"profile": "standard",
"attach_data": true,
"priority": "high",
"to": [
"antonio@elastic.co"
],
"subject": "[Security Alert] - Port scan detected",
"body": "{{ctx.payload.body}}"
}
}
}
}

Can you place it in a code block so it retains the format.

Hi @jancodenew, thanks for the post.

I see that your question presumes you want an EQL solution, but could you possibly take advantage of the security solution's "Threshold" rule type for this use case? The rule could look like this:

One note of caution that applies to watcher or detection engine rules with nested aggregations is that the number aggregation buckets across all (source.ip x destination.ip) combinations could have very high cardinality in a large environment, so you might want to ensure that the rule operates on only a single comprehensive set of network data, and/or include filters in the original query where appropriate.

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.