Transform checkpoints not optimized with date histogram

I am running a transform that groups by 2 fields: 1. terms on a keyword field (client_id), 2. date histogram on a date field (transaction_time). For sync, I am using a separate date field (updated_at) that is basically the ingest timestamp, since my documents can arrive with data from the past.

What I am noticing is that during checkpoints, the transform is reprocessing a huge number of documents (35M) even though from one checkpoint to another (5 mins), I am only ingesting around 200-300 new documents. My source index has around 50M documents total.
Is it possible that the checkpoints are recalculating buckets based on only the first field in group by (client_id), and not trying to optimize (limit queries) on the second field (date histogram)?

ES version: 8.10.0
Here is the output of the last checkpoint stats:

{
  "count": 1,
  "transforms": [
    {
      "id": "transactions-live-agg-by-client",
      "state": "indexing",
      "node": {
        "id": "M914386zRRmLILUGspRt2Q",
        "name": "instance-0000000007",
        "ephemeral_id": "R5lPMmXDRYCdVff2ZcdpKQ",
        "transport_address": "10.0.75.198:19938",
        "attributes": {}
      },
      "stats": {
        "pages_processed": 18571,
        "documents_processed": 375608855,
        "documents_indexed": 12650213,
        "documents_deleted": 0,
        "trigger_count": 12,
        "index_time_in_ms": 1786380,
        "index_total": 18543,
        "index_failures": 0,
        "search_time_in_ms": 110435299,
        "search_total": 18572,
        "search_failures": 0,
        "processing_time_in_ms": 441063,
        "processing_total": 18571,
        "delete_time_in_ms": 0,
        "exponential_avg_checkpoint_duration_ms": 12397282.195840033,
        "exponential_avg_documents_indexed": 1364656.9537854926,
        "exponential_avg_documents_processed": 39317239.46521342
      },
      "checkpointing": {
        "last": {
          "checkpoint": 9,
          "timestamp_millis": 1695243809080,
          "time_upper_bound_millis": 1695243689080
        },
        "next": {
          "checkpoint": 10,
          "position": {
            "indexer_position": {
              "transaction_time": 1688751300000,
              "client_id": "290"
            }
          },
          "checkpoint_progress": {
            "docs_indexed": 1170500,
            "docs_processed": 34601329
          },
          "timestamp_millis": 1695260935575,
          "time_upper_bound_millis": 1695260815575
        },
        "operations_behind": 21341,
        "changes_last_detected_at": 1695260935566,
        "last_search_time": 1695260935566
      },
      "health": {
        "status": "green"
      }
    }
  ]
}

Here is my transform config:

{
  "id": "transactions-live-agg-by-client",
  "authorization": {
    "roles": [
      "superuser"
    ]
  },
  "version": "10.0.0",
  "create_time": 1695144341498,
  "source": {
    "index": [
      "transactions-live-read"
    ],
    "query": {
      "match_all": {}
    }
  },
  "dest": {
    "index": "transactions-live-agg-by-client"
  },
  "frequency": "5m",
  "sync": {
    "time": {
      "field": "updated_at",
      "delay": "2m"
    }
  },
  "pivot": {
    "group_by": {
      "client_id": {
        "terms": {
          "field": "client_id"
        }
      },
      "transaction_time": {
        "date_histogram": {
          "field": "transaction_time",
          "fixed_interval": "5m"
        }
      }
    },
    "aggregations": {
      "total.count": {
        "value_count": {
          "field": "tracking_id"
        }
      },
      "total.amount": {
        "sum": {
          "field": "payment.amount"
        }
      },
      "status": {
        "terms": {
          "field": "status"
        },
        "aggregations": {
          "amount": {
            "sum": {
              "field": "payment.amount"
            }
          },
          "count": {
            "value_count": {
              "field": "tracking_id"
            }
          }
        }
      },
      "decision": {
        "terms": {
          "field": "risk.decision"
        },
        "aggregations": {
          "amount": {
            "sum": {
              "field": "payment.amount"
            }
          },
          "count": {
            "value_count": {
              "field": "tracking_id"
            }
          }
        }
      }
    }
  },
  "settings": {
    "max_page_search_size": 10000
  }
}

After investigating a bit more, and looking at PR #63315, my assumption is that the transform looks at min and max of the documents that were ingested after the last checkpoint, and recomputes everything in between.
For example, if I have 2 new documents with the timestamp fields (the one being used in the group by) as 2021-01-01 00:00 and 2023-01-01 00:00, the checkpoint will reprocess everything in between those 2 timestamps. Which seems a bit odd to me, ideally it should just recompute the buckets that changed. So if my date histogram has a 5min interval, it should only recompute documents that fall within +- 5mins of the timestamps of the changed documents. What am I missing here?

You explained it well, in your case, one or few documents that update a very old bucket cause a recalculation of many other buckets, because change detection uses min and max aggregations to decide which buckets need recalculation. This is a special case, for most use cases this works well, because the update timestamp and the timestamp used for the date_histogram don't differ that much. If in your usecase it is common to update old transactions. Unfortunately that cause a lot of re-computation.

Yes, it should. However this is hard to get right. We need to find out using change detection which buckets to update. Instead of min/max one would need a histogram aggregation to get the changes. Even more complex is the query side as transform needs to create a query to narrow the changes. With min/max a simple range query works well. For a histogram one would need a complex query with lots of range queries for every bucket that has changed. I think for most use cases this is slower than the simple min/max heuristic. It is unclear which strategy is most effective as it depends on the data.

Thanks for the insights. I was thinking that the optimization process could try to create a few date ranges to query (depending on what's a good number to have in the filter query), and not have a date range for every single bucket. It can create a few sliding windows to fit all of the changed buckets in. I am probably not thinking of all the possible edge cases here, but in my case that would go from processing 10-15M documents to a few hundred. This additional optimization can kick in if the number of documents to process are way higher than the number of new documents that were ingested, or it could be a setting in the transform itself.

Considering the way it works currently, do you have any recommendations for my use case? I am currently ingesting around 3M transactions per month, and sometimes a transaction update can arrive up to 6 months later (which causes the transforms to reprocess 18M documents in the worst case scenario). I have the following 2 ideas so far:

  1. Whenever an update arrives for a transaction that is too old, the application manually calculates the bucket it would be in, and updates the transform destination index. The date field is not updated, which will not trigger the transform to consider this as an updated document.

  2. Have smaller source indices (maybe 1 per month), so if an update arrives on an old transaction, the transform will only reprocess transactions from that entire month in the worst case scenario. My question here is if it's even possible to have transforms as part of an index template, so whenever an index is rolled over, the new index has an identical transform sending data to the same destination index?

Your ideas around the optimization sound useful. It will be challenging to get the partitioning right and it must work in a generic way. However I don't see the capacity at the moment to implement it.

Regarding your current transform: How many client_id's do you have? The recalculation you see should only happen for the client_id's that changed. However there is no analysis of co-occurrence: If an old transaction of client A gets updated, but in the 5 minutes time window any update for client B and C happened, transform will still recalculate entries for B and C. It won't recalculate client D and E at least. It might help to reduce frequency to reduce the chance of hitting coincidental updates. You could also create separate transforms based on client_id. It should be possible to let them write into the same destination index.

What I am wondering: In the aggregation part you 2 terms aggregations for status and risk.decision. What if you create separate transforms for those, in this case status and risk.decision would go into the group_by part. This might help reducing the amount of recalculations.

Regarding ILM: No, unfortunately there is no such integration. The idea of having a transform at rollover sounds interesting.

Thanks @Hendrik_Muhs for the ideas. There are currently around 100 client_ids, but this will grow to a higher number. I also like the idea of separating the aggregations into different transforms. For now, we are going with the approach of handling these cases separately in the application, and manually recalculating the buckets to update the transform destination, since that seems like the most optimal solution for our use case. I will report back if we run into any problems with that.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.