Periodically query ES to store pre-computed totals - how?

I have some 20 billion log lines per year going into an ES cluster, and am working on producing 5-minutely snapshots of several metrics to reduce compute cost at query time. Every 5 minutes I will calculate and store, for each of our 200 or so clients:

  • arrays for cardinality aggs, with precomputed hashes (eg unique client IPs)
  • totals for stats-based aggs, like count, sum, etc (eg hits, kb downloaded)
  • arrays of objects for terms aggs, with approx double the number of terms ultimately required - to reduce terms agg errors. (eg popular URLs, UAs)

This will reduce 20 billion records and a few TB (annually) to a few hundred thousand (much smaller) records and a few hundred MB, which I'll be able to keep in a single in-memory shard on separate instance.

What I'm wondering about is the best way to do the 5-minutely query. Logstash's elasticsearch input doesn't appear to have what I need, IIUC. I could write something to do the queries, but perhaps there's something that can help me achieve this in a more "off the shelf" way?

I need to catch new data going forward, and also to backfill for the last few years from existing logs.

Thanks for reading,
Tom

We don't have a good solution for this. We've been brainstorming things but we're really not sure yet.

As it panned out, we've written a fairly nasty (speaking frankly) shell script to calculate date rounding for either current or backfill periods, build a single unified query per interval, and send modified output back to logstash and ultimately to a separate (client-facing) ES cluster. We pull data with a top-level terms agg to split by client, then various sub aggs to as required by the eventual visualisations.

Despite the ugliness of the approach, it's achieving what we hoped. Queries to the raw data are light, and response times against the new index have dropped, in some cases (on equivalent queries), from around 40s to around 50ms. The precomputed index is now about 50gb compared to about 40TB for the raw data. It's still early and we're expecting these numbers to improve further before it goes live. For example we'll try to get the index size down to something that can be stored entirely in RAM.

We've used the fingerprint filter in logstash to enable safe replay of logs and "at least once" delivery. Every 5m we query for the 2 previous 5m windows, potentially updating the penultimate window in the process.

Terms aggs now suffer much the same problem as is encountered with sharding, even though we're using a single shard, but we attempt to reduce the error by pulling twice the number of terms per interval as we'll actually query (the ratio will be subject to tuning later). Cardinality and sum values (count in the source data become sum in the new data) seem to benefit more unambiguously from our approach - completely accurate, and far, far quicker.

Some queries over longer periods can still can still take a few seconds. It's likely we'll do a second round of precomputation to create daily 'summaries', so that a query of eg 2 years hits ~700 records, rather than ~200,000. In one concrete example, calculating cardinality of client IPs over a year for a particular site (with cardinality of 4 million) takes about 12 seconds. Previously, that would've brought our cluster to its knees! So it's an improvement. But eg precomputing daily summaries from the 5-minutely data would make it trivial. We haven't tested precomputation of hashes, but from what I've read it will make little difference to a numeric field.

We chose to store one record per-client per-interval, which appears to be more performant than alternatives but does mean reindexing everything if we change the structure. Having a separate type for each type of data we want would've been a lot more flexible, but we figure that being able to issue a single query for all the data using, and therefore using a single filter, is worth it.

Hey @tomr, I just wanted to say thanks for your notes! Seeing this is really helpful as we brainstorm ways to help.

How are you calculating the cardinality in this case? Just storing an array of values for each "rolled up" document, so that the cardinality agg can grab all distinct values for that interval?

Exactly!

And because cardinality data therefore doesn't lose accuracy, we can easily use the 5-minutely data to precompute daily arrays, reducing by a factor of roughly 300 (ie 12 * 24) the number of records that need to be looked at for longer queries. Calculating cardinality on arrays appears (ie not properly tested!) to be quick.

In our use case we mandate whole days as a minimum interval for longer query ranges, but if we wanted to benefit from an additional longer precomp interval while using shorter interval data for arbitrary start/end times, I'd guess we'd need to separate the shorter interval data in eg daily indices - then we could query for example - where pc-<period>-<date> is the name of a precomputed index with 'periodly' interval - longer intervals bookended with shorter intervals:

http://es:9200/pc-5m-2017.03.27,pc-1d,pc-5m-2017.03.29/....

I'm way out of my depth with how filter cache internals work, but I suspect that by splitting into separate indices (per day / week) we might be able to take better advantage of the filter cache, because filters on the timestamp field won't (/might not) keep getting invalidated by new data. I'm not sure how best to optimise for query caching here. I have a vague suspicion that we could modify our queries to make separate requests, then combine the results, in order to preserve old filters - with much the same results as separating the indices. Or using separate filters within a single query...

But the thing that's proving most problematic is data from terms aggs - I wonder if you have any ideas you could share on those? Thoughts so far:

  • Our current approach to query the source data with terms:{"field":"somefield"} every 5m and store all the key + doc_count responses. (We use one big query for everything at source, with multiple terms aggs - among others)
  • Even within a single shard, the same errors that can occur due to sharding will be exacerbated by this approach (although the actual error rate is dependent on the spread/cardinality of the source data - for consistent and/or low cardinality fields there's no issue). Initially I've tried to minimise this by pulling more terms than are ultimately needed... a quantitative improvement but not a qualitative one.
  • Storing extra terms (ie larger size on the source query) is really expensive in terms of storage and query performance
  • Using a nested->term->sum agg to calculate everything 'properly' on {"field":[{"k":"keyname","v":count"},...]} nested data is really expensive
  • To ease queries, and to reduce storage reqs, where possible we will switch from our starting point of statically mapped {"field":[{"k":"keyname","v":count},...]} to dynamically mapped {"field":[{"keyname":count},...]} - basically for any field where the keys are bounded, even if they aren't all known. Fields where this applies in our web log data are:
    ** output of the useragent logstash filter plugin
    ** http status codes
    ** cache response (from apache traffic server)
    ** geoip country name
  • We will need to be more careful about garbage getting into the logs with a resulting undesirable expansion of mappings
  • The main field where this won't work is request URLs. Long strings, hugely variable... ugh.
  • There may be a more efficient way to store and/or query that we haven't thought of - still looking.

Enough for now, must get back to the lab!
hth

(Taking the rest of what you said in that paragraph into account) In ES 5.X we do agg caching in a more intelligent manner, see Shard request cache settings | Elasticsearch Guide [8.11] | Elastic

Filters though, should be fine even with time based data. There will be some small overhead for new data, but the filter process is pretty efficient irrespective.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.