Watcher - Alarm to match two field from two index

I am trying to deploy a watcher to my lab home. I have logs from zeek (json) and logs from intelmq (Threat Intelligence Sharing). I need compare one field from zeek_index (field: id.resp_h) with one field from intelmq_index (field: source.ip). If they match I would like write a log with a text.

My issue is de input and contidion.

---------begin my watcher--------
PUT _watcher/watch/checkIP
"metadata" : {
"color" : "red"
"trigger" : {
"schedule" : {
"interval": "30s"
"input" : {
"search": {
"request": {
"indices": ["filebeat_zeek-*"],
"body": {
"size" : 0,
"query": {
"match": {"source.ip": ""}
"condition" : {
"compare" : { "" : { "gt" : 1 }}
"actions" : {
"log_error" : {
"logging" : {
"text" : "ohhh! {{}} . chek the network. Malware on!"
---------end my watcher----------

I can search on kibana the ip, but the alert not mutch.
nota: I would like change >"match": {"source.ip": ""}"< with the intelmq_index.field, but if the simple examplo fail, something more complex is an error.


I am not sure I fully understand the requirement. Would it make sense to execute two searches, one in the zeek, another in the intel_mq index for that IP and then check if both return hits?

I'm sorry, I try to explain better.

  • I have two feed information (zeek_index) and (intelmq_index).
  • Zeek_index have one field named "destination-ip"
  • Intelmq_index have one field named "source-ip"
  • I need to compare all "zeek_index" "destination-ip" value with all "intelmq_index" "source-ip" values.
  • If one "destination-ip" == "source-ip", send an email, log, or index.

I try to write the rigth code but I don't know how I can "search" and "compare" it to make it ok.

In my example I use [[ "match": {"source.ip": ""} ]] to check it was ok, I doesn't work.

I think that I must to compare ip from zeek (zeek_index - destination-ip) with all (intelmq_index - source-ip) and send the resolt if it match.

Best regard

I cannot think of any efficient way of doing this, as you need to compare every entry of one index with every entry of another index. And I suppose the ip address space can be arbitrarily high (given IPv6)?

I do not search an efficient method now. It will be the second step. I need search and match one list of string (ip address) in my zeek index. IPv6 will be third step. Some idea to go for the correct route?