Email alert with readable body

Hi Team,

I am newbie to ELK.

Trying to create sample of Threat detection using the below URL as reference.

https://github.com/elastic/examples/tree/master/Security%20Analytics/ssh_analysis

I am trying create an email alert with following email body.

Application: SSH
User : root
Source IP : x.x.x.x
Destination: x.x.x.x
Actions: More than x denied logins, followed by access granted, for the <User>.

Watcher script

  POST _xpack/watcher/watch/_execute

{
"watch": {
"metadata": {
"window_period": "5m",
"required_failures": 3
},
"trigger": {
"schedule": {
"interval": "5s"
}
},
"input": {
"chain": {
"inputs": [
{
"previous": {
"search": {
"request": {
"indices": ["cef-ssh-watch-results"],
"types": "brute_force",
"body": {
"size": 0,
"aggs": {
"users": {
"terms": {
"field": "destinationUserName",
"size": 100
},
"aggs": {
"times": {
"terms": {
"field": "@timestamp",
"size": 100
}
}
}
}
}
}
}
}
}
},
{
"events": {
"search": {
"request": {
"indices": [
"cef-ssh-*"
],
"types": "syslog",
"body": {
"query": {
"bool": {
"filter": [
{
"terms": {
"categoryBehaviour": [
"cowrie.login.success",
"cowrie.login.failed"
]
}
},
{
"exists": {
"field": "destinationUserName"
}
}
]
}
},
"aggregations": {
"users": {
"terms": {
"field": "destinationUserName",
"size": 1500,
"min_doc_count": 4
},
"aggs": {
"times": {
"terms": {
"field": "@timestamp",
"size": 15000,
"order": {
"_term": "asc"
}
},
"aggs": {
"access": {
"terms": {
"field": "categoryBehaviour",
"size": 1
}
}
}
}
}
}
},
"size": 0
}
}
}
}
}
]
}
},
"condition": {
"script": {
"inline": "if (ctx.payload.events.hits.total == 0 || ctx.payload.events.aggregations.users.buckets.size() == 0) { return false; } def historical_events = ; if (ctx.payload.previous.hits.total > 0) { historical_events = ctx.payload.previous.aggregations.users.buckets.stream().flatMap(user -> user.times.buckets.stream().map(time -> user.key + '-' + time.key)).collect(Collectors.toList()); } for (user in ctx.payload.events.aggregations.users.buckets) { def failed = 0; for (time in user.times.buckets) { if (time.access.buckets[0].key == 'cowrie.login.failed') { failed += 1; } else { if (failed >= ctx.metadata.required_failures && !historical_events.contains(user.key + '-' + time.key)) { return true; } else { failed=0; } } } } return false;"
}
},
"transform": {
"script": "def historical_events = ; if (ctx.payload.previous.hits.total > 0) { historical_events = ctx.payload.previous.aggregations.users.buckets.stream().flatMap(user -> user.times.buckets.stream().map(time -> user.key + '-' + time.key)).collect(Collectors.toList()); } def users=[:]; for (user in ctx.payload.events.aggregations.users.buckets) { def times = ; def failed = 0; for (time in user.times.buckets) { if (time.access.buckets[0].key == 'cowrie.login.failed') { failed += 1; } else { if (failed >= ctx.metadata.required_failures && !historical_events.contains(user.key + '-' + time.key)) { times.add(time.key_as_string); } failed = 0; } } if (times.length > 0) { users[user.key] = times; } } return users;"
},
"actions": {
"log": {
"logging": {
"text": "More than {{ctx.metadata.required_failures}} denied logins, followed by access granted, by the same user: {{ctx.payload}}"
}
},
"index_payload": {
"transform": {
"script": "return ['_doc':ctx.payload.entrySet().stream().flatMap(value -> value.getValue().stream().map(timestamp -> ['alert':true,'@timestamp':timestamp,'destinationUserName':value.getKey()])).collect(Collectors.toList())];"
},
"index": {
"index": "cef-ssh-watch-results",
"doc_type": "brute_force"
}
},
"email_alert": {
"email": {
"to": "'John John@example.com'",
"subject": "Suspected SSH Brute force Alert",
"body": "More than {{ctx.metadata.required_failures}} denied logins, followed by access granted, by the same user: {{ctx.payload}}"
}
}
}
}
}

Please assist.

please invest some more time and explain what your problem is properly, what exactly fails. Also provide the output of the execute watch API and what exactly your expectations are. Also explain what you tried so far and what did not work as expected to understand the usecase better and how we can improve on the watcher side.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.