Watcher index action - how to get all the hits data to index

Hi, i'm trying to make a watcher which has a condition that input data meet when its count is larger than or equal to 100.
and i would like to index result data using Index Action.

but i know that count of result data(ctx.payload.hits) is limited to 10000.
so i couldn't index all of them that meet the condition if ctx.payload.hits.total is larger than 10000. (as you know, the total can show real count, but the count of result data is limited.)

i need your help..

let me show you the similar watcher to mine below.

  {
  "trigger": {
    "schedule": {
      "interval": "1m"
    }
  },
  "input": {
    "search": {
      "request": {
        "search_type": "query_then_fetch",
        "indices": [
          "XXX"
        ],
        "types": [],
        "body": {
          "size": 10000,
          "query": {
            "bool": {
              "must": [
                ...
              ]
            }
          },
          "aggs": {
            "aaa_count": {
              "terms": {
                "field": "aaa"
              },
              "aggs": {
                "bbb_count": {
                  "terms": {
                    "field": "bbb"
                  }
                }
              }
            }
          }
        }
      }
    }
  },
  "condition": {
    "script": {
      "source": "for(int i=0;i<ctx.payload.aggregations.aaa_count.buckets.length;i++) {for(int j=0;j<ctx.payload.aggregations.aaa_count.buckets[i].bbb_count.buckets.length;j++) {if(ctx.payload.aggregations.aaa_count.buckets[i].bbb_count.buckets[j].doc_count >= 100) {return true; } } } return false;",
      "lang": "painless"
    }
  },
  "actions": {
    "index_payload": {
      "transform": {
        "script": {
          "source": "def events = []; for(int i=0;i<ctx.payload.aggregations.aaa_count.buckets.length;i++) {for(int j=0;j<ctx.payload.aggregations.aaa_count.buckets[i].bbb_count.buckets.length;j++) {if(ctx.payload.aggregations.aaa_count.buckets[i].bbb_count.buckets[j].doc_count >= 5) {def millis = System.currentTimeMillis(); def event = ['incident_id':'XXX' + millis, 'flag': 'incident', 'alert_type':'XXX', 'aaa': ctx.payload.aggregations.aaa_count.buckets[i].key, 'bbb': ctx.payload.aggregations.aaa_count.buckets[i].bbb_count.buckets[j].key, 'count': ctx.payload.aggregations.aaa_count.buckets[i].bbb_count.buckets[j].doc_count]; events.add(event); for(int k=0;k<ctx.payload.hits.total;k++) {if(ctx.payload.hits.hits[k]._source.aaa == ctx.payload.aggregations.aaa_count.buckets[i].key && ctx.payload.hits.hits[k]._source.bbb == ctx.payload.aggregations.aaa_count.buckets[i].bbb_count.buckets[j].key) {def temp = ctx.payload.hits.hits[k]._source; temp.put('incident_id','XXX' + millis); temp.put('flag','raw'); temp.put('alert_type','XXX'); events.add(temp); } } } } } return ['_doc': events];",
          "lang": "painless"
        }
      },
      "index": {
        "index": "<alerts-{now/d}>",
        "doc_type": "alerts",
        "execution_time_field": "alert_date"
      }
    }
  }
}

i think it's too difficult to read the above script, so i arranged it.

condition:

    for(int i=0;i<ctx.payload.aggregations.aaa_count.buckets.length;i++) {
      for(int j=0;j<ctx.payload.aggregations.aaa_count.buckets[i].bbb_count.buckets.length;j++) {
        if(ctx.payload.aggregations.aaa_count.buckets[i].bbb_count.buckets[j].doc_count >= 100) {
          return true; 
        } 
      } 
    } 
    return false;

action:

def events = []; 
for(int i=0;i<ctx.payload.aggregations.aaa_count.buckets.length;i++) {
  for(int j=0;j<ctx.payload.aggregations.aaa_count.buckets[i].bbb_count.buckets.length;j++) {
    if(ctx.payload.aggregations.aaa_count.buckets[i].bbb_count.buckets[j].doc_count >= 5) {
      def millis = System.currentTimeMillis(); 
      def event = ['incident_id':'XXX' + millis, 
        'flag': 'incident', 
        'alert_type':'XXX', 
        'aaa': ctx.payload.aggregations.aaa_count.buckets[i].key, 
        'bbb': ctx.payload.aggregations.aaa_count.buckets[i].bbb_count.buckets[j].key, 
        'count': ctx.payload.aggregations.aaa_count.buckets[i].bbb_count.buckets[j].doc_count]; 
      events.add(event); 
      // index result data.
      for(int k=0;k<ctx.payload.hits.total;k++) {
        if(ctx.payload.hits.hits[k]._source.aaa == ctx.payload.aggregations.aaa_count.buckets[i].key 
          && ctx.payload.hits.hits[k]._source.bbb == ctx.payload.aggregations.aaa_count.buckets[i].bbb_count.buckets[j].key) {
          def temp = ctx.payload.hits.hits[k]._source; 
          temp.put('incident_id','XXX' + millis); 
          temp.put('flag','raw'); 
          emp.put('alert_type','XXX'); 
          events.add(temp); 
        } 
      } 
    } 
  } 
} 
return ['_doc': events];

@spinscale any thoughts on this ?

You should not do this with the index action of watcher, as it was not built to support this use-case - we strive for short running watches, and this would not be one, also the search input would need to be changed from the current regular search to something more efficient.

What you could do instead is maybe run a reindex action, maybe even without the need for a watch, by simply triggering this action? Would that work for you? And if not, what would be the limiting factor?

ok, let me tell you what we are doing more detailed...
we are making the system that collects syslogs from firewall devices. (maybe it can be SIEM.)
and we have to alert to users when a condition that can be threatening is met.(and this role could be the rule function of SIEM system.)
and those things are okay. i think we can handle it.
but we are having difficulty showing them the raw data of alerts which means the Hits data that meet the condition. the users want to see and research the raw data for checking the reason when an alert is invoked.
so we are trying to make it using Index Action like the code above, but actually we are not sure if this method is right.

and this is one of the alert conditions that we are implementing.

when the same source IP access more than 100 destination IP within 1 minutes and the destination port is 445.

and we tried it like this.

    {
  "trigger": {
    "schedule": {
      "interval": "1m"
    }
  },
  "input": {
    "search": {
      "request": {
        "search_type": "query_then_fetch",
        "indices": [
          "firewall*"
        ],
        "types": [],
        "body": {
          "size": 10000,
          "query": {
            "bool": {
              "must": [
                {
                  "match": {
                    "dst_port": 445
                  }
                },
                {
                  "range": {
                    "@timestamp": {
                      "gte": "now-1m"
                    }
                  }
                }
              ]     
            }
          },
          "aggs": {
            "src_ip_count": {
              "terms": {
                "field": "src_ip"
              },
              "aggs": {
                "removed_dups_dst_ip_count": {
                  "cardinality": {
                    "field": "dst_ip"
                  }
                }
              }
            }
          }
        }
      }
    }
  },
  "condition": {
    "array_compare": {
      "ctx.payload.aggregations.src_ip_count.buckets": {
        "path": "removed_dups_dst_ip_count.value",
        "gte": {
          "value": 100,
          "quantifier": "some"
        }
      }
    }
  },
  "actions": {
    "index_payload": {
      "transform": {
        "script": {
          "source": "def malcious_ips = []; for(int i=0;i<ctx.payload.aggregations.src_ip_count.buckets.length;i++) {if(ctx.payload.aggregations.src_ip_count.buckets[i].removed_dups_dst_ip_count.value>=100) {def millis = System.currentTimeMillis(); def malcious_ip = ['incident_id':'worm_traffic_tcp_445' + millis, 'flag': 'incident', 'alert_type':'worm_traffic_tcp_445', 'src_ip': ctx.payload.aggregations.src_ip_count.buckets[i].key, 'dst_port': 445, 'dst_ip_count': ctx.payload.aggregations.src_ip_count.buckets[i].removed_dups_dst_ip_count.value]; malcious_ips.add(malcious_ip); for(int k=0;k<ctx.payload.hits.total;k++) {if(ctx.payload.hits.hits[k]._source.src_ip == ctx.payload.aggregations.src_ip_count.buckets[i].key) {def temp = ctx.payload.hits.hits[k]._source; temp.put('incident_id','worm_traffic_tcp_445' + millis); temp.put('flag','base'); temp.put('alert_type','worm_traffic_tcp_445'); malcious_ips.add(temp); } } } } return ['_doc': malcious_ips];",
          "lang": "painless"
        }
      },
      "index": {
        "index": "<alerts-{now/d}>",
        "doc_type": "alerts",
        "execution_time_field": "alert_date"
      }
    }
  }
}

and thanks for the way that you suggested, but i didn' get it actually. what does it mean reindexing, do you mean making another Index action for this action?

1 Like

is there any chance you could just point them to kibana so their can explore by themselves? then all you need to do is to provide a link to kibana.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.