Hi, i'm trying to make a watcher which has a condition that input data meet when its count is larger than or equal to 100.
and i would like to index result data using Index Action.
but i know that count of result data(ctx.payload.hits) is limited to 10000.
so i couldn't index all of them that meet the condition if ctx.payload.hits.total is larger than 10000. (as you know, the total can show real count, but the count of result data is limited.)
i need your help..
let me show you the similar watcher to mine below.
{
"trigger": {
"schedule": {
"interval": "1m"
}
},
"input": {
"search": {
"request": {
"search_type": "query_then_fetch",
"indices": [
"XXX"
],
"types": [],
"body": {
"size": 10000,
"query": {
"bool": {
"must": [
...
]
}
},
"aggs": {
"aaa_count": {
"terms": {
"field": "aaa"
},
"aggs": {
"bbb_count": {
"terms": {
"field": "bbb"
}
}
}
}
}
}
}
}
},
"condition": {
"script": {
"source": "for(int i=0;i<ctx.payload.aggregations.aaa_count.buckets.length;i++) {for(int j=0;j<ctx.payload.aggregations.aaa_count.buckets[i].bbb_count.buckets.length;j++) {if(ctx.payload.aggregations.aaa_count.buckets[i].bbb_count.buckets[j].doc_count >= 100) {return true; } } } return false;",
"lang": "painless"
}
},
"actions": {
"index_payload": {
"transform": {
"script": {
"source": "def events = []; for(int i=0;i<ctx.payload.aggregations.aaa_count.buckets.length;i++) {for(int j=0;j<ctx.payload.aggregations.aaa_count.buckets[i].bbb_count.buckets.length;j++) {if(ctx.payload.aggregations.aaa_count.buckets[i].bbb_count.buckets[j].doc_count >= 5) {def millis = System.currentTimeMillis(); def event = ['incident_id':'XXX' + millis, 'flag': 'incident', 'alert_type':'XXX', 'aaa': ctx.payload.aggregations.aaa_count.buckets[i].key, 'bbb': ctx.payload.aggregations.aaa_count.buckets[i].bbb_count.buckets[j].key, 'count': ctx.payload.aggregations.aaa_count.buckets[i].bbb_count.buckets[j].doc_count]; events.add(event); for(int k=0;k<ctx.payload.hits.total;k++) {if(ctx.payload.hits.hits[k]._source.aaa == ctx.payload.aggregations.aaa_count.buckets[i].key && ctx.payload.hits.hits[k]._source.bbb == ctx.payload.aggregations.aaa_count.buckets[i].bbb_count.buckets[j].key) {def temp = ctx.payload.hits.hits[k]._source; temp.put('incident_id','XXX' + millis); temp.put('flag','raw'); temp.put('alert_type','XXX'); events.add(temp); } } } } } return ['_doc': events];",
"lang": "painless"
}
},
"index": {
"index": "<alerts-{now/d}>",
"doc_type": "alerts",
"execution_time_field": "alert_date"
}
}
}
}
i think it's too difficult to read the above script, so i arranged it.
condition:
for(int i=0;i<ctx.payload.aggregations.aaa_count.buckets.length;i++) {
for(int j=0;j<ctx.payload.aggregations.aaa_count.buckets[i].bbb_count.buckets.length;j++) {
if(ctx.payload.aggregations.aaa_count.buckets[i].bbb_count.buckets[j].doc_count >= 100) {
return true;
}
}
}
return false;
action:
def events = [];
for(int i=0;i<ctx.payload.aggregations.aaa_count.buckets.length;i++) {
for(int j=0;j<ctx.payload.aggregations.aaa_count.buckets[i].bbb_count.buckets.length;j++) {
if(ctx.payload.aggregations.aaa_count.buckets[i].bbb_count.buckets[j].doc_count >= 5) {
def millis = System.currentTimeMillis();
def event = ['incident_id':'XXX' + millis,
'flag': 'incident',
'alert_type':'XXX',
'aaa': ctx.payload.aggregations.aaa_count.buckets[i].key,
'bbb': ctx.payload.aggregations.aaa_count.buckets[i].bbb_count.buckets[j].key,
'count': ctx.payload.aggregations.aaa_count.buckets[i].bbb_count.buckets[j].doc_count];
events.add(event);
// index result data.
for(int k=0;k<ctx.payload.hits.total;k++) {
if(ctx.payload.hits.hits[k]._source.aaa == ctx.payload.aggregations.aaa_count.buckets[i].key
&& ctx.payload.hits.hits[k]._source.bbb == ctx.payload.aggregations.aaa_count.buckets[i].bbb_count.buckets[j].key) {
def temp = ctx.payload.hits.hits[k]._source;
temp.put('incident_id','XXX' + millis);
temp.put('flag','raw');
emp.put('alert_type','XXX');
events.add(temp);
}
}
}
}
}
return ['_doc': events];