Retrieve more than 10000 records for search request in Watcher

Hi Guys,
I am trying to retrieve records more than 10k through search request in Watcher, but due to its max-limit, it is rejecting all the records over and above 10k number. There is already a discussion
here on how Scroll API could be used for elasticsearch query, but how do we use it in watcher???

Hey,

watcher does not support searching more than those max documents. Maybe you can talk a little bit about your use-case and we can check, if there is a chance to express this in a different manner.

Thanks!

--Alex

Hi Alex,

Actually I have tried to implement Splunk dedup through watcher, please find below the configuration for the same.

So basically, I have used aggregation to get unique field values(sourcetype, in this case) and along with that there is a query to retrieve actual hits. (And here is the actual problem, some time total hits of the query exceed 10k and when I try to access these rejected hits in transform I get a index out of bound error as they have been chopped off)
Finally in transform section, for every unique value of the field obtained through aggregation, I find a matching document in hits.hits.(If aggregation would have also returned other fields along with the sourcetype field, I wouldn't have to use transform!!! )

This deduplicated hits are then emailed through send email action, which has been removed as per organization policy.

    {
      "trigger": {
        "schedule": {
          "interval" : "5000m"
        }
      },
      "input": {
        "search": {
          "request": {
            "search_type": "query_then_fetch",
            "indices": [
            "logstash-wps*"
            ],
            "rest_total_hits_as_int": true,
            "body": {
              "size" : 10000,
              "query": {
                "bool": {
                  "must": [
                    {
                      "term": {
                        "app": "wps_bcl_my"
                      }
                    },
                    {
                      "range": {
                        "@timestamp": {
                          "gte": "now-15m"
                        }
                      }
                    }
                  ]
                }
              },
              "aggregations" : {
                "unique_sources" : {
                  "terms" : {
                    "field" : "sourcetype",
                    "size" : 100
                  }
                }
              }
            }
          }
      }
      },
      "condition": {
        "compare": {
          "ctx.payload.hits.total": {
            "gt": 0
          }
        }
      },
      "transform":{
        "script":
        """
        def doc_count = ctx.payload.hits.total;
        
        def unique_hits = [ ];
        
        def unique_sources = ctx.payload.aggregations.unique_sources.buckets.stream().map(hit->hit.key).collect(Collectors.toList());
        for(def source : unique_sources){
          for(def i=0; i< doc_count; i++){
            if(source==ctx.payload.hits.hits[i]._source.sourcetype){
              unique_hits.add(ctx.payload.hits.hits[i]);
              break;
            }
          }
        } 
        return unique_hits;
        """
      }
    }

Have you considered trying to reduce or even eliminate duplicates at indexing time instead so you do not need to do this at all?

Another option might be to eliminate most duplicates in your ingest pipeline even before you send them to Elasticsearch.

Hi Christian,
As per our requirement, we can't deduplicate during ingestion or indexing, it is required only for alerting!!!

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.