Hi Team,
I am newbie to ELK.
Trying to create sample of Threat detection using the below URL as reference.
https://github.com/elastic/examples/tree/master/Security%20Analytics/ssh_analysis
I am trying create an email alert with following email body.
Application: SSH
User : root
Source IP : x.x.x.x
Destination: x.x.x.x
Actions: More than x denied logins, followed by access granted, for the <User>.
Watcher script
POST _xpack/watcher/watch/_execute
{
"watch": {
"metadata": {
"window_period": "5m",
"required_failures": 3
},
"trigger": {
"schedule": {
"interval": "5s"
}
},
"input": {
"chain": {
"inputs": [
{
"previous": {
"search": {
"request": {
"indices": ["cef-ssh-watch-results"],
"types": "brute_force",
"body": {
"size": 0,
"aggs": {
"users": {
"terms": {
"field": "destinationUserName",
"size": 100
},
"aggs": {
"times": {
"terms": {
"field": "@timestamp",
"size": 100
}
}
}
}
}
}
}
}
}
},
{
"events": {
"search": {
"request": {
"indices": [
"cef-ssh-*"
],
"types": "syslog",
"body": {
"query": {
"bool": {
"filter": [
{
"terms": {
"categoryBehaviour": [
"cowrie.login.success",
"cowrie.login.failed"
]
}
},
{
"exists": {
"field": "destinationUserName"
}
}
]
}
},
"aggregations": {
"users": {
"terms": {
"field": "destinationUserName",
"size": 1500,
"min_doc_count": 4
},
"aggs": {
"times": {
"terms": {
"field": "@timestamp",
"size": 15000,
"order": {
"_term": "asc"
}
},
"aggs": {
"access": {
"terms": {
"field": "categoryBehaviour",
"size": 1
}
}
}
}
}
}
},
"size": 0
}
}
}
}
}
]
}
},
"condition": {
"script": {
"inline": "if (ctx.payload.events.hits.total == 0 || ctx.payload.events.aggregations.users.buckets.size() == 0) { return false; } def historical_events = ; if (ctx.payload.previous.hits.total > 0) { historical_events = ctx.payload.previous.aggregations.users.buckets.stream().flatMap(user -> user.times.buckets.stream().map(time -> user.key + '-' + time.key)).collect(Collectors.toList()); } for (user in ctx.payload.events.aggregations.users.buckets) { def failed = 0; for (time in user.times.buckets) { if (time.access.buckets[0].key == 'cowrie.login.failed') { failed += 1; } else { if (failed >= ctx.metadata.required_failures && !historical_events.contains(user.key + '-' + time.key)) { return true; } else { failed=0; } } } } return false;"
}
},
"transform": {
"script": "def historical_events = ; if (ctx.payload.previous.hits.total > 0) { historical_events = ctx.payload.previous.aggregations.users.buckets.stream().flatMap(user -> user.times.buckets.stream().map(time -> user.key + '-' + time.key)).collect(Collectors.toList()); } def users=[:]; for (user in ctx.payload.events.aggregations.users.buckets) { def times = ; def failed = 0; for (time in user.times.buckets) { if (time.access.buckets[0].key == 'cowrie.login.failed') { failed += 1; } else { if (failed >= ctx.metadata.required_failures && !historical_events.contains(user.key + '-' + time.key)) { times.add(time.key_as_string); } failed = 0; } } if (times.length > 0) { users[user.key] = times; } } return users;"
},
"actions": {
"log": {
"logging": {
"text": "More than {{ctx.metadata.required_failures}} denied logins, followed by access granted, by the same user: {{ctx.payload}}"
}
},
"index_payload": {
"transform": {
"script": "return ['_doc':ctx.payload.entrySet().stream().flatMap(value -> value.getValue().stream().map(timestamp -> ['alert':true,'@timestamp':timestamp,'destinationUserName':value.getKey()])).collect(Collectors.toList())];"
},
"index": {
"index": "cef-ssh-watch-results",
"doc_type": "brute_force"
}
},
"email_alert": {
"email": {
"to": "'John John@example.com'",
"subject": "Suspected SSH Brute force Alert",
"body": "More than {{ctx.metadata.required_failures}} denied logins, followed by access granted, by the same user: {{ctx.payload}}"
}
}
}
}
}
Please assist.