Transform not aligning checkpoints with date histogram

Hi all.

I'm using pivot transforms to group data by a date histogram and a few other terms dimensions. The transforms are running in continuous mode using the ingest timestamp as the sync.time.field but I'm grouping on another date field. The reason I'm doing this is that I'm expecting a huge delay in data transmission (data is coming from embedded systems with frequent network issues). I'm grouping on the "reading" date but the "ingest" date can be days later and I don't want the transform to miss documents. This works well, even when data comes in very late.

I've noticed that the transforms do not use checkpoints alignment in this case. After a quick look at the code it seems that the optimization is only triggered when the date histogram uses the same field that is used for synchronization.

Is there any chance I can still get aligned checkpoints in this case, considering that I'm not interested in interim buckets anyway?
If not, maybe someone have a similar use case and is doing things differently. I'm open to challenge my implementation if anyone has advices. Please feel free to ask any more details on my cluster, transforms, use case, etc.

Keep up the good work!

Cheers,
David

Hi,

by aligning checkpoints with bucket boundaries in case of a data histogram transform does less updates on the destination index. So this is a performance optimization essentially. As you said this requires that the sync field and date_histogram field are the same.

If they are not as in your case it gets tricky. Like you reported data can arrive out of order, even days later. That's why ingest timestamp help here. However this can't be combined with alignment. The reason is checkpointing. A checkpoint is basically a snapshot, a point in time and transform ensures that all data gets transformed till that checkpoint. When running in continuous mode, transform basically starts with creating a new checkpoint, than querying for all data between the last and the just created checkpoint and than run queries to update only the necessary delta. Here we have a disconnect between the sync field and the field that you use in your date_histogram. As data can be out of order, transform simply doesn't know how to align buckets from the date_histogram. And there is another problem, if it would say round down the bucket, it would need to update that bucket on the next checkpoint. I've spent a lot of time thinking about a solution, I don't see a solution for it.

However, transform has an optimization for this case, introduced in this PR for 7.11. It uses aggregations in the change query to limit the buckets it has to rewrite. By using a min aggregation it knows the lower bound of the bucket to change. Note that this bound has to be rounded down. So with this transform rewrites all buckets starting from the data point with the lowest value of the date_histogram. Lets assume you have data for one device coming in 5 days late and you have daily histogram, than transform would rewrite the buckets for the last 5 days. Min is just a simple heuristic here, if for some reason a device sends data 100 days late, than transform would rewrite 100 buckets. That's a corner case and a weakness for sure, but hopefully a rare case (it seems you experienced something like that, because you say "data comes in very late") Not that this problem might also be caused by wrong dates, I would consider filtering those data points in ingest using some simple plausibility checks to mitigate.

Long story short, I don't see the possibility to combine checkpoint alignment and ingest timestamps.

Having that said, what is your motivation of this question?
I guess it is optimizing throughput. There might be other ways to improve your use case. If you share some more details I might be able to provide some suggestions. I assume you already read our scaling guide.

Hi Hendrik,

Thx for the quick reply. The objective is more reducing the CPU load, and underlying disk iops by limiting the number of document updates. The throughput of the transforms is correct for now.
The complete story is that I initially started by using my "reading" date as the sync field and the CPU usage was lower, down to a point where it was barely noticeable that transforms were running. Of course, they quickly started to miss documents due to the lag in transmission... After switching to the ingest timestamp as the sync field, CPU usage ramped up.

Some numbers:

  • 6 data nodes, which all have the transform role too
  • Data nodes are 20 CPU (Xeon), 64GB RAM, automatic heap allocation (~31GB allocated)
  • I run ~300 transforms, spread as ~50 per data node
  • CPU usage is ~10% per node without transforms, ~30% with transforms running

Performance is currently not an issue, we're just using some CPU. I'm thinking long term here, the service will scale in the future with more devices deployed, more data collected, more transforms, etc.

I can share a complete transform JSON if you need it.

Cheers,
David

Thanks for the reply. I would like to know what interval and frequency you are using for the transforms in question. Or feel free to share the complete json.

Thinking of the difference between aligned checkpoints and your setup I can only imagine the problem I tried to describe: A document with a rather old timestamp(of the date_histogram) arrives late and triggers re-creation of a lot of buckets. Reducing frequency and/or increasing delay might help.

What other optimizations do you use?
Did you disable _source?
It might also help to e.g. reduce the refresh_interval on the transform destination indices.

I'm using 5m as both frequency and delay. The reason for the delay is that destination indices do have a 5m refresh interval too. The source isn't disabled at the destination, I'm using it for some other usages. I've ran some testing using the new synthetic source feature but the fact that it flattens nested arrays (though I perfectly understand why it does it) is currently blocking us. Search was also slower with synthetic source enabled.

Here's a sample transform, for your reference. A few details:

  • metadata.readAt is the reading data
  • metadata.writtenAt is the ingest timestamp
  • metadata.uuid is each item unique identifier
  • metadata.techno, metadata.type and metadata.token are other dimensions used for grouping
  • The gte range in the source query is to avoid running on the full historical data. As I'm doing lots of tests, this avoids long initial runs when the transforms is reset
  • The scripted_metric aggregation is to get the latest document (part of the document actually) per bucket. Definitely looking forward to having transforms support top_hits with source filtering :slight_smile:
Summary
{
  "count": 1,
  "transforms": [
    {
      "id": "downsample-storcloud-metrics-snmp-hosts-15m",
      "source": {
        "index": [
          "storcloud-metrics"
        ],
        "query": {
          "bool": {
            "filter": [
              {
                "term": {
                  "metadata.techno": {
                    "value": "snmp"
                  }
                }
              },
              {
                "term": {
                  "metadata.type": {
                    "value": "hosts"
                  }
                }
              },
              {
                "range": {
                  "metadata.readAt": {
                    "gte": "2023-01-24T12:00:00",
                    "time_zone": "Europe/Paris"
                  }
                }
              }
            ]
          }
        }
      },
      "dest": {
        "index": "storcloud-downsampled-metrics-15m",
        "pipeline": "storcloud-downsampled-metrics"
      },
      "frequency": "5m",
      "sync": {
        "time": {
          "field": "metadata.writtenAt",
          "delay": "5m"
        }
      },
      "pivot": {
        "group_by": {
          "metadata.readAt": {
            "date_histogram": {
              "field": "metadata.readAt",
              "fixed_interval": "15m",
              "time_zone": "Europe/Paris"
            }
          },
          "metadata.token": {
            "terms": {
              "field": "metadata.token"
            }
          },
          "metadata.techno": {
            "terms": {
              "field": "metadata.techno"
            }
          },
          "metadata.type": {
            "terms": {
              "field": "metadata.type"
            }
          },
          "metadata.uuid": {
            "terms": {
              "field": "metadata.uuid"
            }
          }
        },
        "aggregations": {
          "latest": {
            "scripted_metric": {
              "init_script": """
          state.latest = [:];
          state.timestamp = 0;
        """,
              "map_script": """
          long timestamp = doc['metadata.readAt'].value.toInstant().toEpochMilli();

          if (timestamp > state.timestamp) {
            state.timestamp = timestamp;
            state.latest.inventory = params._source.inventory;
            state.latest.metadata = params._source.metadata;
          }
        """,
              "combine_script": "state",
              "reduce_script": """
          Map latest = null;
          long timestamp = 0;

          for (state in states) {
            if (state.timestamp > timestamp) {
              latest = state.latest;
              timestamp = state.timestamp;
            }
          }

          return latest;
        """
            }
          },
          "load_1m.min": {
            "min": {
              "field": "load_1m.value"
            }
          },
          "load_1m.max": {
            "max": {
              "field": "load_1m.value"
            }
          },
          "load_1m.avg": {
            "avg": {
              "field": "load_1m.value"
            }
          },
          "load_1m.sum": {
            "sum": {
              "field": "load_1m.value"
            }
          },
          "load_5m.min": {
            "min": {
              "field": "load_5m.value"
            }
          },
          "load_5m.max": {
            "max": {
              "field": "load_5m.value"
            }
          },
          "load_5m.avg": {
            "avg": {
              "field": "load_5m.value"
            }
          },
          "load_5m.sum": {
            "sum": {
              "field": "load_5m.value"
            }
          },
          "load_15m.min": {
            "min": {
              "field": "load_15m.value"
            }
          },
          "load_15m.max": {
            "max": {
              "field": "load_15m.value"
            }
          },
          "load_15m.avg": {
            "avg": {
              "field": "load_15m.value"
            }
          },
          "load_15m.sum": {
            "sum": {
              "field": "load_15m.value"
            }
          },
          "total_memory.min": {
            "min": {
              "field": "total_memory.value"
            }
          },
          "total_memory.max": {
            "max": {
              "field": "total_memory.value"
            }
          },
          "total_memory.avg": {
            "avg": {
              "field": "total_memory.value"
            }
          },
          "total_memory.sum": {
            "sum": {
              "field": "total_memory.value"
            }
          },
          "used_memory.min": {
            "min": {
              "field": "used_memory.value"
            }
          },
          "used_memory.max": {
            "max": {
              "field": "used_memory.value"
            }
          },
          "used_memory.avg": {
            "avg": {
              "field": "used_memory.value"
            }
          },
          "used_memory.sum": {
            "sum": {
              "field": "used_memory.value"
            }
          },
          "free_memory.min": {
            "min": {
              "field": "free_memory.value"
            }
          },
          "free_memory.max": {
            "max": {
              "field": "free_memory.value"
            }
          },
          "free_memory.avg": {
            "avg": {
              "field": "free_memory.value"
            }
          },
          "free_memory.sum": {
            "sum": {
              "field": "free_memory.value"
            }
          },
          "used_memory_percent.min": {
            "min": {
              "field": "used_memory_percent.value"
            }
          },
          "used_memory_percent.max": {
            "max": {
              "field": "used_memory_percent.value"
            }
          },
          "used_memory_percent.avg": {
            "avg": {
              "field": "used_memory_percent.value"
            }
          },
          "used_memory_percent.sum": {
            "sum": {
              "field": "used_memory_percent.value"
            }
          },
          "cpu_busy_percent.min": {
            "min": {
              "field": "cpu_busy_percent.value"
            }
          },
          "cpu_busy_percent.max": {
            "max": {
              "field": "cpu_busy_percent.value"
            }
          },
          "cpu_busy_percent.avg": {
            "avg": {
              "field": "cpu_busy_percent.value"
            }
          },
          "cpu_busy_percent.sum": {
            "sum": {
              "field": "cpu_busy_percent.value"
            }
          }
        }
      },
      "description": "Downsample metrics for snmp/hosts at 15m interval",
      "settings": {}
    }
  ]
}

David

Thanks,

looks good. The latest scripted metric aggregation could potentially be replaced by top_metrics. Is there a 1 to many relation between uuid and techno, type and/or token? if it's 1:1 only group by uuid and use top_metrics for the others. At least in this transform some seem to be static due to the filter query. You could also set them using an ingest pipeline that you attach to dest.

Thx for the reply.
Unfortunately, my uuid field is only guaranteed to be unique for a given techno, type and token combination, thus the grouping on all dimensions. It also makes the transform hash all of them in the generated document _id and avoids duplicates accross types, due to the not-so-unique uuid (been there :frowning:)

I'll look into top_metrics again, thx for the hint.

David

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.