I'm using kibana and elastic 8.15 version. I'm sending to elasticsearch custom, parsed logs. One of the fields is type IP, and obviuosly contains IP address. I want to setup custom actions (email / webhook) if new, unique IP apperaed for the first time in given time period (optimally in last 7 days).
At this time I've scrippted this in python but it's not efficient (searching 1kk documents and comparing it with last 5 minutes entries takes ~2-3 mins) and with scripting I can't achieve real time analytics.
From reading I understand this is for what Watcher was created, but have no idea how to create proper entry there. I've tried i.e:
"trigger": {
"schedule": {
"interval": "1m"
"input": {
"search": {
"request": {
"indices": ["myindex"]
"body": {
"query": {
"bool": {
"must": [
"range": {
"@timestamp": {
"gte": "now-1w/w"
"lt": "now/w"
"must_not": {
"terms": {
"ip": {
"index": "myindex",
"id": "unique_ips_last_week"
"path": "ip"
"aggs": {
"unique_ips": {
"terms": {
"field": "ip",
"size": 1000000
"condition": {
"compare": {
"ctx.payload.aggregations.unique_ips.buckets.length": {
"gt": 0
"actions": {
"email_admin": {
"email": {
"to": ["admin@example.com"],
"subject": "New IP Address Detected",
"body": {
"text": "The following IPs have been detected for the first time this week:\n{{#ctx.payload.aggregations.unique_ips.buckets}}{{key}} ({{doc_count}} occurrences)\n{{/ctx.payload.aggregations.unique_ips.buckets}}"
or other, similar approaches but nothing works - mostly don't understand how can do comparision if entry is unique in given time period.
With query like this:
POST qmail-logs/_search
"size": 0,
"query": {
"range": {
"@timestamp": {
"gte": "now-7d/d",
"lt": "now"
"aggs": {
"ip_counts": {
"terms": {
"field": "ip",
"size": 10000000
I can get all values from last week and those with
"doc_count": 1
are unique - but no idea how to setup something like this in Watcher / alerting.
Can anyone help how things like can be done and if it's possible at all?