How to Compare two different fields value of two different index?

please guide me.. please guide me to the following problem,
I have two different index and i want to compare two fields value and get result which exist in different index

for example :-
Index 1: threatintel
fields :- 1. indicator_ip
2. message
3. userid

Index 2: checkpoint
fields:- 1. srcip
2. dstip

Need query : Give source IP, message and timestamp
where threatintel.indicator_ip = (checkpoint.srcip or checkpoint.dstip)

@jimczi please help me to achieve it.

A chain input watch is what you need. You can make two separate queries and use the context from the first search as a part of the logic in the second search.

Hi Rich,

thanks for showing me a way, but I don't find good documentation on chain input. please share some article/examples/ or any other reference which I can follow and try to understand, How to Compare two different fields value of two different indexes?

Here's a simple example of comparing two simple static values:

#sample watch comparing two values
POST _xpack/watcher/watch/_execute
{
  "watch": {
    "trigger": {
      "schedule": {
        "interval": "1m"
      }
    },
    "input": {
      "chain": {
        "inputs": [
          {
            "first": {
              "simple": {
                "value": "4"
              }
            }
          },
          {
            "second": {
              "simple": {
                "value": "4"
              }
            }
          }
        ]
      }
    },
    "condition": {
      "script": {
        "source": "return ctx.payload.second.value.equals(ctx.payload.first.value)"
      }
    },
    "actions": {
      "log": {
        "logging": {
          "text": "they are equal!"
        }
      }
    }
  }
}

In this case, if the two values are equal, the Watch will log "they are equal!", otherwise nothing will happen.

A snippet of the output if run in Dev Tools Console is:

      "condition": {
        "type": "script",
        "status": "success",
        "met": true
      },
      "actions": [
        {
          "id": "log",
          "type": "logging",
          "status": "success",
          "logging": {
            "logged_text": "they are equal!"
          }
        }
      ]
    },
    "messages": []
  }
}

If the two input chains are actual queries (instead of silly simple static values), then the structure will be more complicated. For example, here I'm counting the total number of anomalies found (above a score of 75) for two different ML jobs over the last 2 years :

#sample watch comparing volume of two indices
POST _xpack/watcher/watch/_execute
{
  "watch": {
    "trigger": {
      "schedule": {
        "interval": "1m"
      }
    },
    "input": {
      "chain": {
        "inputs": [
          {
            "first": {
              "search": {
                "request": {
                  "indices": [
                    ".ml-anomalies-*"
                  ],
                  "body": {
                    "query": {
                      "bool": {
                        "filter": [
                          {
                            "range": {
                              "timestamp": {
                                "gte": "now-2y"
                              }
                            }
                          },
                          {
                            "term": {
                              "result_type": "bucket"
                            }
                          },
                          {
                            "term": {
                              "job_id": "farequote"
                            }
                          },
                          {
                            "range": {
                              "anomaly_score": {
                                "gte": "75"
                              }
                            }
                          }
                        ]
                      }
                    }
                  }
                }
              }
            }
          },
          {
            "second": {
              "search": {
                "request": {
                  "indices": [
                    ".ml-anomalies-*"
                  ],
                  "body": {
                    "query": {
                      "bool": {
                        "filter": [
                          {
                            "range": {
                              "timestamp": {
                                "gte": "now-2y"
                              }
                            }
                          },
                          {
                            "term": {
                              "result_type": "bucket"
                            }
                          },
                          {
                            "term": {
                              "job_id": "gallery"
                            }
                          },
                          {
                            "range": {
                              "anomaly_score": {
                                "gte": "75"
                              }
                            }
                          }
                        ]
                      }
                    }
                  }
                }
              }
            }
          }
        ]
      }
    },
    "condition": {
          "compare" : { "ctx.payload.first.hits.total" : { "eq" : "{{ctx.payload.second.hits.total}}" }}
    },
    "actions": {
      "log": {
        "logging": {
          "text": "they are equal!"
        }
      }
    }
  }
}

If the number of the anomalies found is the same, then the message is logged. Here's a snippet of the watch output showing that in my case, this second example didn't match (the condition was not met):

     "condition": {
        "type": "compare",
        "status": "success",
        "met": false,
        "compare": {
          "resolved_values": {
            "ctx.payload.second.hits.total": 55,
            "ctx.payload.first.hits.total": 6
          }
        }
      },
      "actions": []
    },
    "messages": []
  }
}

Notice that I have used two different approaches for the condition block - the first uses a script and the second uses a compare.

Hope this helps.

1 Like

Thanks a Lot, Rich,

below I tried:-

Index -1

GET threatintel/_search

{
  "took": 10,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 2,
    "max_score": 1,
    "hits": [
      {
        "_index": "threatintel",
        "_type": "doc",
        "_id": "001",
        "_score": 1,
        "_source": {
          "hostname": "newhostname1",
          "indicator-ip": "10.10.10.11"
        }
      },
      {
        "_index": "threatintel",
        "_type": "doc",
        "_id": "002",
        "_score": 1,
        "_source": {
          "hostname": "newhostname2",
          "indicator-ip": "10.10.10.10"
        }
      }
    ]
  }
}

then index -2 and watcher---

GET checkpoint/_search

{
  "took": 4,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 1,
    "max_score": 1,
    "hits": [
      {
        "_index": "checkpoint",
        "_type": "doc",
        "_id": "001",
        "_score": 1,
        "_source": {
          "hostname": "testhostname",
          "srcip": "10.10.10.10"
        }
      }
    ]
  }
}

here is the watcher:- 
    {
      "trigger": {
        "schedule": {
          "interval": "1m"
        }
      },
      "input": {
        "chain": {
          "inputs": [
            {
              "first": {
                "search": {
                  "request": {
                    "search_type": "query_then_fetch",
                    "indices": [
                      "checkpoint*"
                    ],
                    "types": [],
                    "body": {
                      "query": {
                        "bool": {
                          "filter": [
                            {
                              "range": {
                                "timestamp": {
                                  "gte": "now-1d"
                                }
                              }
                            },
                            {
                              "match": {
                                "srcip": "10.10.10.10"
                              }
                            }
                          ]
                        }
                      }
                    }
                  }
                }
              }
            },
            {
              "second": {
                "search": {
                  "request": {
                    "search_type": "query_then_fetch",
                    "indices": [
                      "threatintel*"
                    ],
                    "types": [],
                    "body": {
                      "query": {
                        "bool": {
                          "filter": [
                            {
                              "range": {
                                "timestamp": {
                                  "gte": "now-1d"
                                }
                              }
                            },
                            {
                              "match": {
                                "indicator-ip": "10.10.10.10"
                              }
                            }
                          ]
                        }
                      }
                    }
                  }
                }
              }
            }
          ]
        }
      },
      "condition": {
        "compare": {
          "ctx.payload.first.hits.hits.0._source.srcip": {
            "eq": "{{ctx.payload.second.hits.hits.0._source.indicator-ip}}"
          }
        }
      },
      "actions": {
        "log": {
          "logging": {
            "level": "info",
            "text": "checkpoint sourceIP {{ctx.payload.first.hits.hits.0._source.srcip}} matching {{ctx.payload.second.hits.hits.0._source.indicator-ip}}"
          }
        }
      }
    }

I am getting "execution_state": "failed", and
"exception": {
"type": "index_out_of_bounds_exception",
"reason": "Index: 0, Size: 0"
}

Put your entire watch code into:

POST _xpack/watcher/watch/_execute
{
  "watch": {

#insert your watch code here

  }
}

and run in dev tools console. Attach the output in your response.

ohh apologies, i used the watcher UI, is it still necessary?

Do it in dev tools console to see the details of the watch execution

it seems its failing

{
  "_id": "_inlined__140f7366-09bf-4c7c-9a15-79b6c9c9f52a-2018-05-24T18:05:50.994Z",
  "watch_record": {
    "watch_id": "_inlined_",
    "node": "q7xL1NmzTJevx9SAcOY1CA",
    "state": "failed",
    "status": {
      "state": {
        "active": true,
        "timestamp": "2018-05-24T18:05:50.994Z"
      },
      "actions": {
        "log": {
          "ack": {
            "timestamp": "2018-05-24T18:05:50.994Z",
            "state": "awaits_successful_execution"
          }
        }
      },
      "execution_state": "failed",
      "version": -1
    },
    "trigger_event": {
      "type": "manual",
      "triggered_time": "2018-05-24T18:05:50.994Z",
      "manual": {
        "schedule": {
          "scheduled_time": "2018-05-24T18:05:50.994Z"
        }
      }
    },

Can you get a simple watch working (not a chained one) against just one of the indices?

i have removed the timestamp as my index do not have it. my bad...
i tried this ---

POST _xpack/watcher/watch/_execute
{
  "watch": 
  {
        "trigger": {
    "schedule": {
      "interval": "30m"
    }
  },
  "input": {
    "search": {
      "request": {
        "search_type": "query_then_fetch",
        "indices": [
          "checkpoint*"
        ],
        "types": [],
        "body": {
          "size": 0,
          "query": {
            "match_all": {}
          }
        }
      }
    }
  },
  "condition": {
    "compare": {
      "ctx.payload.hits.total": {
        "gte": 1
      }
    }
  },
  "actions": {
    "my-logging-action": {
      "logging": {
        "level": "info",
        "text": "ip is {{ctx.payload.hits.hits.0._source.srcip}} "
      }
    }
  }

  }
}

output is ------>

{
  "_id": "_inlined__5be15f43-967d-403e-8029-501491a93696-2018-05-24T18:47:00.199Z",
  "watch_record": {
    "watch_id": "_inlined_",
    "node": "ntz1QNeQQ5WNKqFuBGWkzg",
    "state": "executed",
    "status": {
      "state": {
        "active": true,
        "timestamp": "2018-05-24T18:47:00.198Z"
      },
      "last_checked": "2018-05-24T18:47:00.199Z",
      "last_met_condition": "2018-05-24T18:47:00.199Z",
      "actions": {
        "my-logging-action": {
          "ack": {
            "timestamp": "2018-05-24T18:47:00.198Z",
            "state": "awaits_successful_execution"
          },
          "last_execution": {
            "timestamp": "2018-05-24T18:47:00.199Z",
            "successful": false,
            "reason": ""
          }
        }
      },
      "execution_state": "executed",
      "version": -1
    },
    "trigger_event": {
      "type": "manual",
      "triggered_time": "2018-05-24T18:47:00.199Z",
      "manual": {
        "schedule": {
          "scheduled_time": "2018-05-24T18:47:00.199Z"
        }
      }
    },
    "input": {
      "search": {
        "request": {
          "search_type": "query_then_fetch",
          "indices": [
            "checkpoint*"
          ],
          "types": [],
          "body": {
            "size": 0,
            "query": {
              "match_all": {}
            }
          }
        }
      }
    },
    "condition": {
      "compare": {
        "ctx.payload.hits.total": {
          "gte": 1
        }
      }
    },
    "result": {
      "execution_time": "2018-05-24T18:47:00.199Z",
      "execution_duration": 23,
      "input": {
        "type": "search",
        "status": "success",
        "payload": {
          "_shards": {
            "total": 5,
            "failed": 0,
            "successful": 5,
            "skipped": 0
          },
          "hits": {
            "hits": [],
            "total": 1,
            "max_score": 0
          },
          "took": 2,
          "timed_out": false
        },
        "search": {
          "request": {
            "search_type": "query_then_fetch",
            "indices": [
              "checkpoint*"
            ],
            "types": [],
            "body": {
              "size": 0,
              "query": {
                "match_all": {}
              }
            }
          }
        }
      },
      "condition": {
        "type": "compare",
        "status": "success",
        "met": true,
        "compare": {
          "resolved_values": {
            "ctx.payload.hits.total": 1
          }
        }
      },
      "actions": [
        {
          "id": "my-logging-action",
          "type": "logging",
          "status": "failure",
          "error": {
            "root_cause": [
              {
                "type": "general_script_exception",
                "reason": "Error running com.github.mustachejava.codes.DefaultMustache@42f14af6"
              }
            ],
            "type": "general_script_exception",
            "reason": "Error running com.github.mustachejava.codes.DefaultMustache@42f14af6",
            "caused_by": {
              "type": "mustache_exception",
              "reason": "Failed to get value for ctx.payload.hits.hits.0._source.srcip @[query-template:1]",
              "caused_by": {
                "type": "mustache_exception",
                "reason": "0 @[query-template:1]",
                "caused_by": {
                  "type": "index_out_of_bounds_exception",
                  "reason": "0"
                }
              }
            }
          }
        }
      ]
    },
    "messages": []
  }
}

Take out "size": 0 from the input query.

you are awesome!!!

Thanks, Rich, for taking this to the end. finally, it worked and I have successfully created my first co-related watcher. now I believe I will be able to explore ELK as a SIEM,
please explain to me why size was interpreting it?
again, thanks,

here is the final Watcher which I created -

POST _xpack/watcher/watch/_execute
{
  "watch": 
  {
  "trigger": {
    "schedule": {
      "interval": "1m"
    }
  },
  "input": {
    "chain": {
      "inputs": [
        {
          "first": {
            "search": {
              "request": {
                "search_type": "query_then_fetch",
                "indices": [
                  "checkpoint*"
                ],
                "types": [],
                "body": {
                  "query": {
                    "match": {
                      "srcip": "10.10.10.10"
                    }
                  }
                }
              }
            }
          }
        },
        {
          "second": {
            "search": {
              "request": {
                "search_type": "query_then_fetch",
                "indices": [
                  "threatintel*"
                ],
                "types": [],
                "body": {
                  "query": {
                    "match": {
                      "indicator-ip": "10.10.10.10"
                    }
                  }
                }
              }
            }
          }
        }
      ]
    }
  },
  "condition": {
    "compare": {
      "ctx.payload.first.hits.hits.0._source.srcip": {
        "eq": "{{ctx.payload.second.hits.hits.0._source.indicator-ip}}"
      }
    }
  },
  "actions": {
    "log": {
      "logging": {
        "level": "info",
        "text": "source ip   {{ctx.payload.first.hits.hits.0._source.srcip}} from checkpoint logs  is matching equals to ThreatIntel Ip  {{ctx.payload.second.hits.hits.0._source.indicator-ip}} "
      }
    }
  }
}
}
1 Like

size=0 is a way to suppress the output records ("hits") - it is usually used when your query also uses an aggregation (and you're more interested in the results of the aggregation than the raw records that contribute to the aggregation)

Thanks for your Help Rich,

HI Rich,

now both index has IP address field but with different name, and due to many logs, their are multiple IP address, is it possible to compare aggregated buckets array of one index with another. how should i write a compare condition where any of the ip matches, alert should trigger

POST _xpack/watcher/watch/_execute
{
  "watch": 
  {
  "trigger": {
    "schedule": {
      "interval": "1m"
    }
  },
  "input": {
    "chain": {
      "inputs": [
        {
          "first": {
            "search": {
              "request": {
                "search_type": "query_then_fetch",
                "indices": [
                  "logstash-checkpoint*"
                ],
                "types": [],
                "body": {
                  "aggs": {
    "checkpoint-srcip": {
      "terms": {
        "field": "srcip.keyword",
          "order": {
          "_count": "desc"
        }
      }
    }
  },
  "stored_fields": [
    "*"
  ],
  "script_fields": {},
  "docvalue_fields": [
    "@timestamp",
    "security_log.time"
  ],
  "query": {
    "bool": {
      "must": [
        {
          "match_all": {}
        },
        {
          "range": {
            "@timestamp": {
              "gte": "now-1h"
            }
          }
        }
      ],
      "filter": [],
      "should": [],
      "must_not": []
    }
  }
                }
              }
            }
          }
        },
        {
          "second": {
            "search": {
              "request": {
                "search_type": "query_then_fetch",
                "indices": [
                  "logstash-security-*"
                ],
                "types": [],
                "body": {
                    "size": 0,
  "_source": {
    "excludes": []
  },
  "aggs": {
    "threatintel-indicator": {
      "terms": {
        "field": "indicator.keyword",
            "order": {
          "_count": "desc"
        }
      }
    }
  },
  "stored_fields": [
    "*"
  ],
  "script_fields": {},
  "docvalue_fields": [
    "@timestamp",
    "security_log.time"
  ],
  "query": {
    "bool": {
      "must": [
        {
          "match_all": {}
        },
        {
          "range": {
            "@timestamp": {
                "gte" : "now-1y"
            }
          }
        }
      ],
      "filter": [],
      "should": [],
      "must_not": []
    }
  }
                }
              }
            }
          }
        }
      ]
    }
  },
  "condition": {
    "compare": {
     ????????????????????????????????????????????????
    }
  },
  "actions": {
    "log": {
      "logging": {
        "level": "info",
        "text": "what condition should i write"
      }
    }
  }
}
    
}

This is probably helpful:

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.