Transforms: do I need to filter source for time-series data?

I'm doing a transformation of time-series/events data that does not change for the past.

I've talked to an Elastic engineer Ben on Slack and it seems like I might have stumbled upon a bug.

I'm using this index config and this transform config.

The problem is that each iteration seems to be processing all the data. I've been running the transform on our indices (stats-[date]) and here are the stats:

  • 204B documents processed
  • 14 min total indexing time
  • 108 hours total search time
  • 25 seconds total processing time
  • "average" documents processed: 221M
  • "average" checkpoint duration: 7 min

We use curator to delete old indices and so we always have 6 indices, each with under 50M docs, if I sum them up, it currently comes at 237M, right around the average of each run. So it looks like it processes all documents each time.

Ben says that there have been some improvements since 7.7.0, but I'm on 7.9.1.

Here's my CPU utilization and load across this week of turning it on.

Since then, I have modified my transform to do a filter query on the timestamp field, here's the new transform with a range query in the source. This time, however, it seems like it's running fine. Is this expected behaviour?

Transform minimizes the amount of data that needs to be re-processed as explained in the docs.

In your case transform looks for customers that have changed and time buckets that have changed. Here is the problem: You have a fixed interval of 1d, so that's a rather large bucket. On the other side you run the transform every 10m. Transform will recalculate the intermediate bucket of "today" every 10 minutes, which means it will rewrite this bucket 144 times until it decides to ignore it. To reduce the rewrites you can set frequency to a higher value, meaning running less often.

Do you need the intermediate bucket results at a 10 minute frequency?

Or you change the date_histogram to less than a day, this will produce more documents, but avoid a lot of rewrites. To get back to daily results you can use aggregations on the transform destination index.

In your new configuration you use a range query to limit the amount of data transform reads, however you still group daily but only allow a 3 hour window back in time. This will produce wrong results, because the last bucket will not contain all data of the day. Combining delay and the range query, it will only contain 180m-90m=90m of data.

Please check your results in the transform index. It runs faster but I bet, it does not create correct results.

If you are curious what transform does, which queries it sends, you can enable debug logging to see the queries it sends:

PUT /_cluster/settings
{
   "transient": {
      "logger.org.elasticsearch.xpack.transform.transforms": "debug"
   }
}

You should see given your 1st transform that transform adds a range query to not re-process buckets from previous days. (Note your day starts at 01:30, because you set delay to 90m, which means that transform expects that data can be up to 90m late, so a data point from yesterday can arrive e.g. at 01:30 today).

Given your 2nd transform you should see 2 range queries combined.

Don't forget to set the log level back to e.g. info or warning, whatever your preferences are.

1 Like

Yes, that's why I set it to 1 hour in my new config. That is still too low for me because, as you pointed out, I'm going to be calculating the same day many times.

Not sure what you mean here. What's an intermediate bucket result? I need the transformed data in 1-day buckets and I don't care that much about the delay and don't care if it comes in with latency.

Possibly, but that's not what we currently want as that would increase the query complexity by doing aggregations and increase the storage needed, but we'd like to retain the data for a long time.

Ah you're right, so I would need, if we do find out that transforms search all the docs al the time, set this to something like 26 hours (24 hours for one bucket + 1.5h for the delay).

I'll turn on debug logging in my staging cluster and check things out. But the main thing that I don't think you touched on is the fact that if I don't do a filter, Elastic will do a scan of all the documents every time and whether that's expected?

That last bucket, the bucket of today, remains incomplete until the day is over, so it is an "intermediate result". I got you aren't looking for those and already set frequency
to the maximum possible value. A bucket of today worst-case still gets re-calculated 24 times a day, but at least not 144 times as before.

We have plans to improve scheduling, which will let you run the transform at certain times a day and e.g. only once a day.

That's not correct, transform will not do a full scan of all documents. Have you checked the link in my 1st post? The docs explain it, checkout the section about change detection heuristics.

If you prefer a more technical deep-dive, this pull request has some numbers. In another pull request you find a simple drawing about how change detection works.

Nevertheless, you should be able to confirm/see it in action by turning on debug logging.

I have read the docs. Twice. They're pretty terse, so didn't take me much time. Have you read my original post? Did you see my numbers? It looks like, with a high degree of certainty, it's parsing every document.

Okay, so I now switched to our staging cluster to enable debugging as you suggested. Here's my list of slightly modified commands for staging. Notable changes are that I run it every minute with a one minute delay and without a filter. Then I ran the following to get the logs that mention the docs processed: docker logs stats_elasticsearch_1 | grep docs_processed | jq ".message".

The latest number is 2144822, you can check more of docker logs here. Check my indices:

Clearly it adds up:
31601+201998+162229+400426+1350592 = 2146846 ~= 2144822

I checked the code again and realized that for 7.9.1 the query is only logged using trace logging.

That's the line of interest.

So in order to get this line you need:

PUT /_cluster/settings
{
   "transient": {
      "logger.org.elasticsearch.xpack.transform.transforms": "trace"
   }
}

It would be great if you could re-run the test and check the logs for this line, you should see a range query with bucket boundaries and your filter query if you have one.

Tracing logs here.

I've parsed some of the logs and I see two queries, one is called the changes query, this one looks fine:

{"size":0,"query":{"bool":{"filter":[{"match_all":{"boost":1.0}},{"range":{"timestamp":{"from":1623877564261,"to":1623877624260,"include_lower":true,"include_upper":false,"format":"epoch_millis","boost":1.0}}}],"adjust_pure_negative":true,"boost":1.0}},"aggregations":{"_transform":{"composite":{"size":500,"sources":[{"customer":{"terms":{"field":"customer","missing_bucket":false,"order":"asc"}}}]}}}}

You can see the range with a from-to. Good. But then there's a query query:

{"size":0,"query":{"bool":{"filter":[{"match_all":{"boost":1.0}},{"range":{"timestamp":{"from":null,"to":1623877624260,"include_lower":true,"include_upper":false,"format":"epoch_millis","boost":1.0}}},{"bool":{"filter":[{"terms":{"customer":["_xyz_"],"boost":1.0}}],"adjust_pure_negative":true,"boost":1.0}}],"adjust_pure_negative":true,"boost":1.0}},"aggregations":{"_transform":{"composite":{"size":500,"sources":[{"day":{"date_histogram":{"field":"timestamp","missing_bucket":false,"value_type":"date","order":"asc","fixed_interval":"1d"}}},{"customer":{"terms":{"field":"customer","missing_bucket":false,"order":"asc"}}}]},"aggregations":{"credits":{"sum":{"field":"credits"}},"requests":{"value_count":{"field":"timestamp"}},"error_codes":{"scripted_metric":{"init_script":{"source":"state.responses = [:]","lang":"painless"},"map_script":{"source":" def key_name = 'missing'; if (doc['error.code'].size() != 0) { def error_code = doc['error.code'].value; if (error_code >= 400 && error_code <= 499) { key_name = '4xx'; } else if (error_code >= 500 && error_code <= 599) { key_name = '5xx'; } else { key_name = error_code; } }||| if (!state.responses.containsKey(key_name)) { state.responses[key_name] = 0; } state.responses[key_name] += 1; ","lang":"painless"},"combine_script":{"source":"state.responses","lang":"painless"},"reduce_script":{"source":" def counts = [:]; for (responses in states) { for (key in responses.keySet()) { if (key == 'missing') { continue; } if (!counts.containsKey(key)) { counts[key] = 0; } counts[key] += responses[key]; } } return counts; ","lang":"painless"}}}}}}}

This one is the one that also runs my painless code and I think is the one that gets the actual data. Notice how from is null?

Thank you! I found it.

I updated this issue, for some reason this fix did not make it into 7.9.

Your options are

  • update to >7.10
  • I verified that if you change your config to use the same input and output field name, the optimization works:
        "group_by": {
            "timestamp": {"date_histogram": {"field": "timestamp", "fixed_interval": "1d"}},
...

TL/DR:

A query that applies the optimization contains something like this:

...
        {
          "range": {
            "timestamp": {
              "from": null,
              "to": 1623912178447,
              "include_lower": true,
              "include_upper": false,
              "format": "epoch_millis",
              "boost": 1
            }
          }
        },
        {
          "bool": {
            "filter": [
              {
                "range": {
                  "timestamp": {
                    "from": 1623888000000,
                    "to": null,
                    "include_lower": true,
                    "include_upper": true,
                    "format": "epoch_millis",
                    "boost": 1
                  }
                }
              },
              {
                "terms": {
                  "customer": [
                    "xyz"
                  ],
                  "boost": 1
                }
              }
...

The 1st range query sets the upper bound of the checkpoint, the other 2 filters are the change queries, one for each group_by. The range query - that's the one you miss - rounds down the lower bound to the bucket boundary: 1623888000000.

I am sorry this slipped through the release process, the issue applies only to the 7.9 series.

Nice, modifying the grouping seems to be better. One downside that remains is that, even when I'll set it at one hour, it will process the whole day. But I think I can live with that. Great, thanks for the help.

Great, this helps. Further performance improvements are coming.

BTW. Your implementation for aggregating response codes can also be implemented using filter aggregations like in this example. I don't think it matters much in terms of performance, however it might simplify your configuration.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.