Ensure that watcher does not miss documents (logs)

Hey

When searching for logs in Elastic using Watcher, a range query on the @timestamp field can be used:

"range": {
  "@timestamp": {
    "gte": "now-1h-5m",
    "lt": "now-5m"
  }
}

Note the -5m which I have not seen anyone else using yet. This leaves some time for logs to be processed and end up in Elasticsearch before searching them. Failing to specify some delay and just using now will result in missing all logs which fall into the range but which have not yet made it into Elasticsearch. (Tracked in elastic/examples#221).

I noticed that the actual time when the watch is executed various slightly (by a few hundred ms, field: result.execution_time). Examples:

  • 2018-04-12 11:07:57.299
  • 2018-04-12 10:07:57.190
  • 2018-04-12 09:07:57.175
  • 2018-04-12 08:07:57.453
  • 2018-04-12 07:07:57.140

This causes the issue that certain documents are missed, while others are processed two times.

For example, the last execution at 2018-04-12 11:07:57.299 only looked back until 2018-04-12 10:02:57.299. So it missed 109 ms worth of logs! For security logs, this is not acceptable. Is there any way to ensure that no such gaps can occur? I already researched this and the only thing I could find to partly mitigate this would be to do date rounding but this is by no means an solution to this. Another, better solution would be to have some overlap of the range queries, but this will lead to duplicate events. It might be possible to deduplicate those events again, for example using the _id of Elasticsearch when outputting the watch result into an index but this is not always applicable and finding a _id schema which fulfills this is going to be difficult.

Ref: Watcher example: errors_in_logs

Basing any kind of processing logic where every record has to be processed exactly once on the timestamp representing when the event originated is tricky as delays in the ingest process can vary a lot. A better way might be to add a timestamp representing the ingest time for each event. This should be less sensitive to delays in the pipeline and can be added e.g. through Logstash or an ingest pipeline using the _ingest.timestamp metadata field.

I am however not aware of any built in feature that can be used to automatically keep track of the last processed timestamp.

To get around the fact that watcher execution start time is not exact, date rounding might be an option now that the potential delays have been reduced.

Another option could perhaps be to keep track of the last timestamp processed through a document in a special index. You could use a chained input to first get the last timestamp from the index and then use this as starting point for the range filter in the main query. At the end you could update the timestamp document based on the highest timestamp processed by the watch through an index action to make sure the watch picks up at the right spot next time.

There is even another hurdle to this: Elasticsearch refresh times. In order to make sure you dont have any misses, you would need to issue a refresh before every watch execution and be sure that your ingestion pipeline does not suffer from delays.

If you really need those semantics of exactly once, then a search might not be what you are searching for. Have you thought about running a percolate query against each of the documents?

Thanks for the input to you both! I address them first and then document my findings below.

I guess the _refresh is not needed as long as the time offset (-5m in my example above) is bigger than the time events spend in the log collection pipeline +refresh_interval set for the Elasticsearch index?

I tried to wrap my head around the percolate query and how that would work for my use case but I don’t fully understand it yet. Just a short question, with the percolate query it is possible to get all new documents which have been indexed after executing the last percolate query? I read that it has limitations in regards to running aggregations on it. I think for my use case which is very similar to https://github.com/elastic/examples/tree/master/Alerting/Sample%20Watches/errors_in_logs except that I run an aggregation and only transform the aggregation result by the watch.

Using the ingest time is an interesting idea. I will give it some more thought and will see. We already have the ingest time added by Logstash for other reasons.

Storing the timestamp of the last processed event is also interesting but I think it can be avoided as proposed now.

Proposed solution

Date Rounding, as mentioned by Christian Dahlqvist, is one part of the solution I would say. The issue was that I just used the interval schedule and did not check the Watcher Schedule Trigger docs carefully. I was afraid of drifting starting times over time. I prevented this by using one of the other trigger schedules.

For writing and testing my watches, I use the integration tests of elastic/examples, however I can not yet contribute to Elastic in my work hours because the CLA is not yet signed by my company so I document this here.

Some thought has been put into verifying that we do not loss any logs, or get duplicates. I am confident that I achieved this as long as the Elasticsearch cluster is always operational and no snapshots are restored or stuff like this.

I will paste the relevant parts of two example watches, written in YAML with relevant documentation.

Example for triggering every minute

Partial watch README.md

Triggers and date ranges

The watch triggers at second 23 every minute and selects exactly one minute.
Because the actual time where the query is executed various slightly (which determines the value of now in the range query), we use date rounding to the full minute.

Because events need some time from when they are emitted on the origin, until they have been picked up, traveled though the log collection pipeline and got indexed and refreshed (_refresh API) in Elasticsearch, a watch running now does not look at the range from now until now-1m but an additional delay of 5 minutes is used.

The first timestamp included is for example 2018-04-16T05:59:00.000Z and the last 2018-04-16T05:59:59.999Z.

This can be tested with the following query. Just index suitable documents before.
2018-04-16T06:05:00.999Z represents now:

GET test-2018/_search?filter_path=took,hits.total,hits.hits._source
{
  "sort": [
    "@timestamp"
  ],
  "query": {
    "range": {
      "@timestamp": {
        "gte": "2018-04-16T06:05:00.999Z||-1m-5m/m",
        "lt": "2018-04-16T06:05:00.999Z||-5m/m"
      }
    }
  }
}

Date selection considerations for integration tests

gte: 2018-04-16T12:14:00.000Z, offset -360
lt:  2018-04-16T12:14:59.999Z, offset -300.001
now: 2018-04-16T12:20:00.000Z

gte: 2018-04-16T12:14:00.000Z, offset -419.999
lt:  2018-04-16T12:14:59.999Z, offset -360
now: 2018-04-16T12:20:59.999Z

offset -360 is always in the range. offset from -419.999 until (including)
-300.001 is sometimes in the range, depending on now. now is not mocked by
the test framework so we need to use -360 for all test data or something
outside of the range for deterministic tests.
Unfortunately, deterministic tests can not be ensured currently because there is some delay between offset to timestamp calculation and the execution of the watch. If that happens in integration tests, rerun the test. The probability for this to happen is very low with ~0.3 % (~0.2s / 60s * 100).

The offset calculations can be verified with this Python code:

(datetime.datetime.strptime('2018-04-16T12:20:59.999Z', "%Y-%m-%dT%H:%M:%S.%fZ") - datetime.datetime.strptime('2018-04-16T12:14:00.000Z', "%Y-%m-%dT%H:%M:%S.%fZ")).total_seconds()

Partial watch example

---

# yamllint disable rule:line-length rule:comments-indentation

metadata:

  ## Use an offset to allow events to travel thought their pipeline.
  ## See: https://discuss.elastic.co/t/ensure-that-watcher-does-not-miss-documents-logs/127780/1
  time_offset: '5m'
  time_window: '1m'

## We never want to throttle/drop actions.
throttle_period: '0s'

trigger:
  schedule:
    ## Based on ctx.metadata.time_window.
    ## Everyone triggers jobs at second 0 so this one is triggered at second 23
    ## every minute.
    cron:
      - '23 * *    * * ?'

input:
  search:
    ## Based on ctx.metadata.time_window.
    timeout: '40s'
    request:

      body:

        query:
          range:
            '@timestamp':
              gte: 'now-{{ctx.metadata.time_window}}-{{ctx.metadata.time_offset}}/m'
              lt: 'now-{{ctx.metadata.time_offset}}/m'

Example for triggering every hour

Partial watch README.md

Triggers and date ranges

The watch triggers at minute 32 every hour and selects the complete last hour. The staging watch triggers at minute 36.
Because the actual time where the query is executed various slightly (which determines the value of now in the range query), we use date rounding to the full hour.

Because events need some time from when they are emitted on the origin, until they have been picked up from the database table, traveled though the log collection pipeline and got indexed and refreshed (_refresh API) in Elasticsearch, a watch running now does not look at the range from now until now- 1h but an additional delay of 30 minutes is used.

The first timestamp included is for example 2018-04-16T05:00:00.000Z and the last 2018-04-16T05:59:59.999Z.

This can be tested with the following query. Just index suitable documents before.
2018-04-16T06:29:59.999Z represents now:

GET test-2018/_search?filter_path=took,hits.total,hits.hits._source
{
  "sort": [
    "@timestamp"
  ],
  "query": {
    "range": {
      "@timestamp": {
        "gte": "2018-04-16T06:29:59.999Z||-1h-30m/h",
        "lt": "2018-04-16T06:29:59.999Z||-30m/h"
      }
    }
  }
}

Date selection considerations

gte: 2018-04-16T05:00:00.000Z, offset -5400
lt: 2018-04-16T05:59:59.999Z, offset -1800.001
now: 2018-04-16T06:30:00.000Z

gte: 2018-04-16T05:00:00.000Z, offset -8999.999
lt: 2018-04-16T05:59:59.999Z, offset -5400
now: 2018-04-16T07:29:59.999Z

offset -5400 is always in the range. Offset from -8999.999 until (including)
-1800.001 is sometimes in the range, depending on now. now is not mocked by
the test framework so we need to use -5400 for all test data or something
outside of the range for deterministic tests.
Unfortunately, deterministic tests can not be ensured currently because there is some delay between offset to timestamp calculation and the execution of the watch. If that happens in integration tests, rerun the test. The probability for this to happen is very low with ~0.005 % (~0.2s / 3600s * 100).

The offset calculations can be verified with this Python code:

(datetime.datetime.strptime('2018-04-16T12:20:59.999Z', "%Y-%m-%dT%H:%M:%S.%fZ") - datetime.datetime.strptime('2018-04-16T12:14:00.000Z', "%Y-%m-%dT%H:%M:%S.%fZ")).total_seconds()

Partial watch example

---

# yamllint disable rule:line-length rule:comments-indentation

metadata:
  ## Use a offset to allow events to travel thought their pipeline.
  ## See: https://discuss.elastic.co/t/ensure-that-watcher-does-not-miss-documents-logs/127780/1
  time_offset: '30m'
  time_window: '1h'

## We never want to throttle/drop actions.
throttle_period: '0s'

trigger:
  schedule:
    ## `hourly` is based on ctx.metadata.time_window.
    hourly:
      ## `minute` can be changed freely, the actual time range is
      ## calculated/rounded in the range filter to full hours.
      minute: 32

input:
  search:
    timeout: '5m'
    request:

      body:

        query:
          range:
            '@timestamp':
              gte: 'now-{{ctx.metadata.time_window}}-{{ctx.metadata.time_offset}}/h'
              lt: 'now-{{ctx.metadata.time_offset}}/h'

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.