Security analytics is a common use case for the Elastic Stack. In this short topic, we will set up a rule based alert that triggers a notification when network or log events correlate with indicators of compromise. We would alert near real time so investigators can start their investigation and remediate as quickly as possible.
One of the great features of Elastic Alert is if you can query it in Elasticsearch you can alert on it. There are a number of built-in integrations, email, PagerDuty, Slack, and Jira, just to name a few. It also supports other monitoring infrastructure via a webhook or REST endpoint.
Our task at hand is to find any rogue host or remote IP address that may have accessed our website. We will store these known threats in an Elasticsearch index called notable_hosts
. We are using Filebeat to ingest NGINX web access logs into Elasticsearch. We will query for all remote hosts that have visited our site in the last five minutes from the filebeat-*
indices and see if any correlates with that in the notable_host
index. If a match is found, we will raise an alert in Slack. Here is the full alert.
First, we want the alert to run every five minutes.
"trigger": {
"schedule": {
"interval": "5m"
}
}
Now we are ready to specify the data we are investigating. Since we are searching for correlations between two indices, we will need to use the Elastic Alert Chain Input. We will take the results for the first query called ip_input
, all of the remote hosts that have accessed our website in the last five minutes, and pass them into the second query named lookup
to see if there were any match in the notable_host
index.
"input": {
"chain": {
"inputs": [
{
"ip_input": {
"search": {
"request": {
"search_type": "query_then_fetch",
"indices": [
"filebeat-*"
],
"body": {
"query": {
"bool": {
"must": [
{
"range": {
"@timestamp": {
"gte": "now-5m"
}
}
}
]
}
},
"aggs": {
"remoteip": {
"terms": {
"field": "nginx.access.remote_ip"
},
"aggs": {
"beathost": {
"terms": {
"field": "beat.hostname"
}
}
}
}
},
"size": 0
}
}
}
}
},
{
"lookup": {
"search": {
"request": {
"search_type": "query_then_fetch",
"indices": [
"notable_hosts"
],
"body": {
"query": {
"terms": {
"geoip.ip": [
"{{#ctx.payload.ip_input.aggregations.remoteip.buckets}}{{key}}",
"{{/ctx.payload.ip_input.aggregations.remoteip.buckets}}"
]
}
}
}
}
}
}
}
]
}
}
Let's take a look at the terms query in the second query, lookup
:
"query": {
"terms": {
"geoip.ip": [
"{{#ctx.payload.ip_input.aggregations.remoteip.buckets}}{{key}}",
"{{/ctx.payload.ip_input.aggregations.remoteip.buckets}}"
]
}
}
We are using the ctx
or the Watch execution context and its current payload. The query values are the term bucket aggregation keys from the first query remoteip
aggregation.
In addition, we have a child aggregation within the remoteip
aggregation in the ip_input
query, getting hosts that produced the problematic log events. We will forward this information as a part of the notification to the investigators.
The next section in our watch is the condition. If the condition evaluates to true, then an alerting action should trigger.
"condition": {
"compare" : { "ctx.payload.lookup.hits.total" : { "gt" : 0 }}
}
Notice ctx.payload.lookup.hits.total
looks very similar to a query response in Elasticsearch.
Before we perform the alert action, we should clean up the ctx.payload
so that only the relevant information is carried forward. We can do that in the transform section.
"transform": {
"script": {
"lang": "painless",
"source": """
def first=ctx.payload.lookup.hits.hits.stream().map(p -> p._source.geoip.ip).collect(Collectors.toList());
def second=ctx.payload.ip_input.aggregations.remoteip.buckets.stream().map(
e->{
return [ 'indicator': e.key, 'host': e.beathost.buckets.stream().map(h -> h.key).collect(Collectors.toList()) ]
}).filter(p->first.contains(p.indicator)).collect(Collectors.toList());
return second;"""
}
}
A few things to note here:
- We want all the rogue hosts or IP addresses that have matched:
def first=ctx.payload.lookup.hits.hits.stream().map(p -> p._source.geoip.ip).collect(Collectors.toList());
-
We will filter the
remoteip
query results to that correlating with the rogue IP addresses from the variablefirst
and return the rogue IP addresses and the list of hosts where they have been spotted in the logs. -
Please see java.util.stream package for syntax details.
The last part of our alert is to send the notification to Slack. All is left to do is to format the message using Mustache.
"actions": {
"notify-slack": {
"throttle_period": "5m",
"slack": {
"account": "team1",
"message": {
"from": "watcher",
"to": [ "@sherry" ],
"text": "System Monitoring",
"attachments": [
{
"title": "Correlations Found",
"text": "Please investigate these indicators and hosts:\n{{#ctx.payload._value}}{{.}}\n\n{{/ctx.payload._value}}",
"color": "danger"
}]
}
}
}
}
Finally, to test and debug the Watch, use the _execute API.