Understanding continous transforms syncing

Hi,

thanks for your interest in continuous transforms and your very detailed feedback. Let me first answer your notes/questions.

This is not correct. frequency is only a trigger to check for changes. The value of frequency is not used for querying the data. When transform decides that a new run is necessary it creates a new checkpoint than the query uses (old_checkpoint_timestamp, new_checkpoint_timestamp] to detect changes. This is important, because you might have stopped the transform or transform might need more than 1m for 1 run (for checkpoint 1). As checkpoints are persisted, transform should not miss any data, even if a failure occurs. You can see checkpoint information as part of the _stats API.

Checkpointing is explained in the docs here. In the docs you will also see that there are 2 phases: 1st detecting changes, 2nd applying changes. Let me go into more details after your next note.

This is not correct. Transform runs in 2 phases, detection and applying. In your use case using a terms group_by it works like this:

  • create a new checkpoint, we call it t_new, lets call the former checkpoint t_old
  • transform queries with range (t_old, t_new] for changes
  • assume the changes query returns that transaction a, d and y changed
  • transform queries using a terms query for a, d and y in the range (0, t_new]
  • transform indexes the updates for a, d and y, but not for transaction b, c, ...

Note that the apply phase used range (0, t_new] but not (t_old, t_new]. This is important to see all values, e.g. for calculating duration.

A downside is the requirement to keep old data, at least long enough so transform sees all required data. However, if in your case you know that 1 transaction can not be longer that a certain amount of time, you can age out data in your source index. You can also use a query to define a lower boundary, that way you can prevent transform from querying e.g. cold nodes.

Change detection has several strategies, e.g. recently we introduced an optimization for a special case with date_histogram. The drawing in the issue might be useful to understand how it works.

Transform does not know when the timestamp of your transaction got created. If you e.g. created that from an application on a device, you have to account for processing times, network transmission, ingest execution, lucene refresh. Assume your transaction got created with timestamp t, until transform can see the datapoint time passes. This is what we call delay, you should configure delay based on the worst case. There is also the option to e.g. move the timestamp used for sync closer to transform, e.g. not using a timestamp created on the device but using an ingest timestamp, created in elasticsearch using an ingest pipeline. Note that even for this case you have a minimum delay, lucene indexes are refreshed per default with a 1s interval.

If we look at the phases again: When I said transform creates a checkpoint, it creates a checkpoint not using now() as timestamp but now() - delay. You can see this in the body response of _stats in the field time_upper_bound_millis.

That is not correct as I explained. Transform works in 2 phases, only the detection phase uses a two-sided window, the apply changes phase uses a 1-sided window.

I hope that makes it more clear. If you are interested to see this in action, you can change the log level of the transform indexer to see the queries it executes (creates log spam, only do this in a dev environment):

PUT /_cluster/settings
{
   "transient": {
      "logger.org.elasticsearch.xpack.transform.transforms": "trace"
   }
}

Last but not least I like to comment on your configuration:

I suggest to delete both. The exists query is useless, the terms grouping will drop buckets without a key anyhow (the next scheduled release will support grouping null buckets, but that has to be enabled using a parameter).

The range query is hostile for transform, it's dangerous, because you can break change detection with it. I hope that becomes clear with the explanations above. Never use lte or lt on the same field used for sync. This can cause data corruption. For gte or gt there is a valid usecase: if you know that a transaction can e.g. not last for more than 1d you can use now-1d to narrow the query. Search will not return different results but you prevent some network calls in a bigger cluster that uses a hot-warm architecture.

Last but not least, transform as well as elasticsearch applies optimizations under the hood whenever possible. By trying to "help" elasticsearch with query filters, you might actually gain opposite effects. Start simple and introduce optimizations after you measured the effect.

1 Like