Thanks for the input to you both! I address them first and then document my findings below.
I guess the _refresh
is not needed as long as the time offset (-5m
in my example above) is bigger than the time events spend in the log collection pipeline +refresh_interval
set for the Elasticsearch index?
I tried to wrap my head around the percolate query and how that would work for my use case but I don’t fully understand it yet. Just a short question, with the percolate query it is possible to get all new documents which have been indexed after executing the last percolate query? I read that it has limitations in regards to running aggregations on it. I think for my use case which is very similar to https://github.com/elastic/examples/tree/master/Alerting/Sample%20Watches/errors_in_logs except that I run an aggregation and only transform the aggregation result by the watch.
Using the ingest time is an interesting idea. I will give it some more thought and will see. We already have the ingest time added by Logstash for other reasons.
Storing the timestamp of the last processed event is also interesting but I think it can be avoided as proposed now.
Proposed solution
Date Rounding, as mentioned by Christian Dahlqvist, is one part of the solution I would say. The issue was that I just used the interval schedule and did not check the Watcher Schedule Trigger docs carefully. I was afraid of drifting starting times over time. I prevented this by using one of the other trigger schedules.
For writing and testing my watches, I use the integration tests of elastic/examples, however I can not yet contribute to Elastic in my work hours because the CLA is not yet signed by my company so I document this here.
Some thought has been put into verifying that we do not loss any logs, or get duplicates. I am confident that I achieved this as long as the Elasticsearch cluster is always operational and no snapshots are restored or stuff like this.
I will paste the relevant parts of two example watches, written in YAML with relevant documentation.
Example for triggering every minute
Partial watch README.md
Triggers and date ranges
The watch triggers at second 23 every minute and selects exactly one minute.
Because the actual time where the query is executed various slightly (which determines the value of now
in the range query), we use date rounding to the full minute.
Because events need some time from when they are emitted on the origin, until they have been picked up, traveled though the log collection pipeline and got indexed and refreshed (_refresh
API) in Elasticsearch, a watch running now
does not look at the range from now
until now-1m
but an additional delay of 5 minutes is used.
The first timestamp included is for example 2018-04-16T05:59:00.000Z and the last 2018-04-16T05:59:59.999Z.
This can be tested with the following query. Just index suitable documents before.
2018-04-16T06:05:00.999Z represents now
:
GET test-2018/_search?filter_path=took,hits.total,hits.hits._source
{
"sort": [
"@timestamp"
],
"query": {
"range": {
"@timestamp": {
"gte": "2018-04-16T06:05:00.999Z||-1m-5m/m",
"lt": "2018-04-16T06:05:00.999Z||-5m/m"
}
}
}
}
Date selection considerations for integration tests
gte: 2018-04-16T12:14:00.000Z, offset -360
lt: 2018-04-16T12:14:59.999Z, offset -300.001
now: 2018-04-16T12:20:00.000Z
gte: 2018-04-16T12:14:00.000Z, offset -419.999
lt: 2018-04-16T12:14:59.999Z, offset -360
now: 2018-04-16T12:20:59.999Z
offset -360 is always in the range. offset from -419.999 until (including)
-300.001 is sometimes in the range, depending on now
. now
is not mocked by
the test framework so we need to use -360 for all test data or something
outside of the range for deterministic tests.
Unfortunately, deterministic tests can not be ensured currently because there is some delay between offset to timestamp calculation and the execution of the watch. If that happens in integration tests, rerun the test. The probability for this to happen is very low with ~0.3 % (~0.2s / 60s * 100).
The offset calculations can be verified with this Python code:
(datetime.datetime.strptime('2018-04-16T12:20:59.999Z', "%Y-%m-%dT%H:%M:%S.%fZ") - datetime.datetime.strptime('2018-04-16T12:14:00.000Z', "%Y-%m-%dT%H:%M:%S.%fZ")).total_seconds()
Partial watch example
---
# yamllint disable rule:line-length rule:comments-indentation
metadata:
## Use an offset to allow events to travel thought their pipeline.
## See: https://discuss.elastic.co/t/ensure-that-watcher-does-not-miss-documents-logs/127780/1
time_offset: '5m'
time_window: '1m'
## We never want to throttle/drop actions.
throttle_period: '0s'
trigger:
schedule:
## Based on ctx.metadata.time_window.
## Everyone triggers jobs at second 0 so this one is triggered at second 23
## every minute.
cron:
- '23 * * * * ?'
input:
search:
## Based on ctx.metadata.time_window.
timeout: '40s'
request:
body:
query:
range:
'@timestamp':
gte: 'now-{{ctx.metadata.time_window}}-{{ctx.metadata.time_offset}}/m'
lt: 'now-{{ctx.metadata.time_offset}}/m'