I am running a transform that groups by 2 fields: 1. terms on a keyword field (client_id), 2. date histogram on a date field (transaction_time). For sync, I am using a separate date field (updated_at) that is basically the ingest timestamp, since my documents can arrive with data from the past.
What I am noticing is that during checkpoints, the transform is reprocessing a huge number of documents (35M) even though from one checkpoint to another (5 mins), I am only ingesting around 200-300 new documents. My source index has around 50M documents total.
Is it possible that the checkpoints are recalculating buckets based on only the first field in group by (client_id), and not trying to optimize (limit queries) on the second field (date histogram)?
ES version: 8.10.0
Here is the output of the last checkpoint stats:
{
"count": 1,
"transforms": [
{
"id": "transactions-live-agg-by-client",
"state": "indexing",
"node": {
"id": "M914386zRRmLILUGspRt2Q",
"name": "instance-0000000007",
"ephemeral_id": "R5lPMmXDRYCdVff2ZcdpKQ",
"transport_address": "10.0.75.198:19938",
"attributes": {}
},
"stats": {
"pages_processed": 18571,
"documents_processed": 375608855,
"documents_indexed": 12650213,
"documents_deleted": 0,
"trigger_count": 12,
"index_time_in_ms": 1786380,
"index_total": 18543,
"index_failures": 0,
"search_time_in_ms": 110435299,
"search_total": 18572,
"search_failures": 0,
"processing_time_in_ms": 441063,
"processing_total": 18571,
"delete_time_in_ms": 0,
"exponential_avg_checkpoint_duration_ms": 12397282.195840033,
"exponential_avg_documents_indexed": 1364656.9537854926,
"exponential_avg_documents_processed": 39317239.46521342
},
"checkpointing": {
"last": {
"checkpoint": 9,
"timestamp_millis": 1695243809080,
"time_upper_bound_millis": 1695243689080
},
"next": {
"checkpoint": 10,
"position": {
"indexer_position": {
"transaction_time": 1688751300000,
"client_id": "290"
}
},
"checkpoint_progress": {
"docs_indexed": 1170500,
"docs_processed": 34601329
},
"timestamp_millis": 1695260935575,
"time_upper_bound_millis": 1695260815575
},
"operations_behind": 21341,
"changes_last_detected_at": 1695260935566,
"last_search_time": 1695260935566
},
"health": {
"status": "green"
}
}
]
}
Here is my transform config:
{
"id": "transactions-live-agg-by-client",
"authorization": {
"roles": [
"superuser"
]
},
"version": "10.0.0",
"create_time": 1695144341498,
"source": {
"index": [
"transactions-live-read"
],
"query": {
"match_all": {}
}
},
"dest": {
"index": "transactions-live-agg-by-client"
},
"frequency": "5m",
"sync": {
"time": {
"field": "updated_at",
"delay": "2m"
}
},
"pivot": {
"group_by": {
"client_id": {
"terms": {
"field": "client_id"
}
},
"transaction_time": {
"date_histogram": {
"field": "transaction_time",
"fixed_interval": "5m"
}
}
},
"aggregations": {
"total.count": {
"value_count": {
"field": "tracking_id"
}
},
"total.amount": {
"sum": {
"field": "payment.amount"
}
},
"status": {
"terms": {
"field": "status"
},
"aggregations": {
"amount": {
"sum": {
"field": "payment.amount"
}
},
"count": {
"value_count": {
"field": "tracking_id"
}
}
}
},
"decision": {
"terms": {
"field": "risk.decision"
},
"aggregations": {
"amount": {
"sum": {
"field": "payment.amount"
}
},
"count": {
"value_count": {
"field": "tracking_id"
}
}
}
}
}
},
"settings": {
"max_page_search_size": 10000
}
}