I'm using pivot transforms to group data by a date histogram and a few other terms dimensions. The transforms are running in continuous mode using the ingest timestamp as the sync.time.field but I'm grouping on another date field. The reason I'm doing this is that I'm expecting a huge delay in data transmission (data is coming from embedded systems with frequent network issues). I'm grouping on the "reading" date but the "ingest" date can be days later and I don't want the transform to miss documents. This works well, even when data comes in very late.
I've noticed that the transforms do not use checkpoints alignment in this case. After a quick look at the code it seems that the optimization is only triggered when the date histogram uses the same field that is used for synchronization.
Is there any chance I can still get aligned checkpoints in this case, considering that I'm not interested in interim buckets anyway?
If not, maybe someone have a similar use case and is doing things differently. I'm open to challenge my implementation if anyone has advices. Please feel free to ask any more details on my cluster, transforms, use case, etc.
by aligning checkpoints with bucket boundaries in case of a data histogram transform does less updates on the destination index. So this is a performance optimization essentially. As you said this requires that the sync field and date_histogram field are the same.
If they are not as in your case it gets tricky. Like you reported data can arrive out of order, even days later. That's why ingest timestamp help here. However this can't be combined with alignment. The reason is checkpointing. A checkpoint is basically a snapshot, a point in time and transform ensures that all data gets transformed till that checkpoint. When running in continuous mode, transform basically starts with creating a new checkpoint, than querying for all data between the last and the just created checkpoint and than run queries to update only the necessary delta. Here we have a disconnect between the sync field and the field that you use in your date_histogram. As data can be out of order, transform simply doesn't know how to align buckets from the date_histogram. And there is another problem, if it would say round down the bucket, it would need to update that bucket on the next checkpoint. I've spent a lot of time thinking about a solution, I don't see a solution for it.
However, transform has an optimization for this case, introduced in this PR for 7.11. It uses aggregations in the change query to limit the buckets it has to rewrite. By using a min aggregation it knows the lower bound of the bucket to change. Note that this bound has to be rounded down. So with this transform rewrites all buckets starting from the data point with the lowest value of the date_histogram. Lets assume you have data for one device coming in 5 days late and you have daily histogram, than transform would rewrite the buckets for the last 5 days. Min is just a simple heuristic here, if for some reason a device sends data 100 days late, than transform would rewrite 100 buckets. That's a corner case and a weakness for sure, but hopefully a rare case (it seems you experienced something like that, because you say "data comes in very late") Not that this problem might also be caused by wrong dates, I would consider filtering those data points in ingest using some simple plausibility checks to mitigate.
Long story short, I don't see the possibility to combine checkpoint alignment and ingest timestamps.
Having that said, what is your motivation of this question?
I guess it is optimizing throughput. There might be other ways to improve your use case. If you share some more details I might be able to provide some suggestions. I assume you already read our scaling guide.
Thx for the quick reply. The objective is more reducing the CPU load, and underlying disk iops by limiting the number of document updates. The throughput of the transforms is correct for now.
The complete story is that I initially started by using my "reading" date as the sync field and the CPU usage was lower, down to a point where it was barely noticeable that transforms were running. Of course, they quickly started to miss documents due to the lag in transmission... After switching to the ingest timestamp as the sync field, CPU usage ramped up.
Some numbers:
6 data nodes, which all have the transform role too
Data nodes are 20 CPU (Xeon), 64GB RAM, automatic heap allocation (~31GB allocated)
I run ~300 transforms, spread as ~50 per data node
CPU usage is ~10% per node without transforms, ~30% with transforms running
Performance is currently not an issue, we're just using some CPU. I'm thinking long term here, the service will scale in the future with more devices deployed, more data collected, more transforms, etc.
I can share a complete transform JSON if you need it.
Thanks for the reply. I would like to know what interval and frequency you are using for the transforms in question. Or feel free to share the complete json.
Thinking of the difference between aligned checkpoints and your setup I can only imagine the problem I tried to describe: A document with a rather old timestamp(of the date_histogram) arrives late and triggers re-creation of a lot of buckets. Reducing frequency and/or increasing delay might help.
What other optimizations do you use?
Did you disable _source?
It might also help to e.g. reduce the refresh_interval on the transform destination indices.
I'm using 5m as both frequency and delay. The reason for the delay is that destination indices do have a 5m refresh interval too. The source isn't disabled at the destination, I'm using it for some other usages. I've ran some testing using the new synthetic source feature but the fact that it flattens nested arrays (though I perfectly understand why it does it) is currently blocking us. Search was also slower with synthetic source enabled.
Here's a sample transform, for your reference. A few details:
metadata.readAt is the reading data
metadata.writtenAt is the ingest timestamp
metadata.uuid is each item unique identifier
metadata.techno, metadata.type and metadata.token are other dimensions used for grouping
The gte range in the source query is to avoid running on the full historical data. As I'm doing lots of tests, this avoids long initial runs when the transforms is reset
The scripted_metric aggregation is to get the latest document (part of the document actually) per bucket. Definitely looking forward to having transforms support top_hits with source filtering
looks good. The latest scripted metric aggregation could potentially be replaced by top_metrics. Is there a 1 to many relation between uuid and techno, type and/or token? if it's 1:1 only group by uuid and use top_metrics for the others. At least in this transform some seem to be static due to the filter query. You could also set them using an ingest pipeline that you attach to dest.
Thx for the reply.
Unfortunately, my uuid field is only guaranteed to be unique for a given techno, type and token combination, thus the grouping on all dimensions. It also makes the transform hash all of them in the generated document _id and avoids duplicates accross types, due to the not-so-unique uuid (been there )
I'll look into top_metrics again, thx for the hint.
Apache, Apache Lucene, Apache Hadoop, Hadoop, HDFS and the yellow elephant
logo are trademarks of the
Apache Software Foundation
in the United States and/or other countries.