Hi, I'm facing an issue with an Elasticsearch transform that can't handle a continuous stream of high ingestion (~100k documents per minute) into the netflow-read-* source index, as well as occasional backlogs (e.g. 10M documents in 2 hours). The transform fails with TooManyBucketsException (65,536 bucket limit exceeded) in continuous processing when the backlog grows. I need help scaling this without increasing search.max_buckets, while keeping aggregation working.
Hi Lucas,
Have you tried applying tips from the Working with Transforms at Scale document?
It may turn out useful.
If you'd like, you can also provide your transform's configuration JSON. That would make it much easier to help with this case.
This is my config, I use in calendarInterval ("1m" | "1h" | "1d" | "1w" | "1M" | "1y"):
await this.elastic.transform.putTransform({
transform_id: `${destIndex}-aggregation`,
body: {
source: {
index: `${this.indexName}-*`,
},
dest: { index: destIndex },
pivot: {
group_by: {
interval: {
date_histogram: {
field: "firstInUtc",
calendar_interval: calendarInterval,
}
},
routerIp: {
terms: {
field: "ip_router.keyword"
},
},
},
aggregations: {
"group_by_proto": {
"terms": { "field": "proto", missing: -1 },
"aggs": {
"group_by_input_snmp": {
"terms": { "field": "input_snmp", missing: -1 },
"aggs": {
"group_by_output_snmp": {
"terms": { "field": "output_snmp", missing: -1 },
"aggs": {
"group_by_src_ip": {
"terms": { "field": "src_ip.keyword", missing: '0.0.0.0' },
"aggs": {
"group_by_dst_ip": {
"terms": { "field": "dst_ip.keyword", missing: '0.0.0.0' },
"aggs": {
"group_by_src_port": {
"terms": { "field": "src_port", missing: -1 },
"aggs": {
"group_by_dst_port": {
"terms": { "field": "dst_port", missing: -1 },
"aggs": {
"total_in_bytes": {
"sum": { "field": "in_bytes" }
},
"total_in_packets": {
"sum": { "field": "in_packets" }
},
"total_out_bytes": {
"sum": { "field": "out_bytes" }
},
"total_out_packets": {
"sum": { "field": "out_packets" }
}
}
}
}
}
}
}
}
}
}
}
}
}
}
},
"group_by_input_snmp": {
"terms": { "field": "input_snmp", missing: -1 },
"aggs": {
"group_by_output_snmp": {
"terms": { "field": "output_snmp", missing: -1 },
"aggs": {
"group_by_src_asn": {
"terms": { "field": "src_as", missing: -1 },
"aggs": {
"group_by_dst_asn": {
"terms": { "field": "dst_as", missing: -1 },
"aggs": {
"total_in_bytes": {
"sum": { "field": "in_bytes" }
},
"total_in_packets": {
"sum": { "field": "in_packets" }
},
"total_out_bytes": {
"sum": { "field": "out_bytes" }
},
"total_out_packets": {
"sum": { "field": "out_packets" }
}
}
}
}
}
}
}
}
}
},
},
sync: {
time: {
field: "firstInUtc",
}
},
frequency: "1s",
settings: {
max_page_search_size: 200,
docs_per_second: 2000
}
}
});
And recently this error was generated, I believed it was because of storage space, but when the source index was filled with data again, the transformation did not return to normal. I wanted to know if, for example, if the transformation stops and the source index continues to receive data, if when the transformation returns it can read all the pending data that was not read. For example, it stops for 10 hours and during that time the source index is receiving data. When it returns, can it read it?
{
"count": 1,
"transforms": [
{
"id": "netflow-minute-aggregation",
"state": "failed",
"reason": "task encountered more than 10 failures; latest failure: Search rejected due to missing shards [[netflow-read-2025-3-8][0]]. Consider using `allow_partial_search_results` setting to bypass this error.",
"node": {
"id": "NkSdAIdmQQuSo_ymQ2CUcA",
"name": "teste-netflow",
"ephemeral_id": "08uMYPimSkKNjusNivggmw",
"transport_address": "127.0.0.1:9300",
"attributes": {}
},
"stats": {
"pages_processed": 9442,
"documents_processed": 60009003,
"documents_indexed": 2363,
"documents_deleted": 0,
"trigger_count": 141271,
"index_time_in_ms": 151140,
"index_total": 2361,
"index_failures": 0,
"search_time_in_ms": 1254233,
"search_total": 9442,
"search_failures": 0,
"processing_time_in_ms": 35906,
"processing_total": 9442,
"delete_time_in_ms": 0,
"exponential_avg_checkpoint_duration_ms": 1119.67134345541,
"exponential_avg_documents_indexed": 1.0000000000000002,
"exponential_avg_documents_processed": 28928.415916676517
},
"checkpointing": {
"last": {
"checkpoint": 2361,
"timestamp_millis": 1741402800400,
"time_upper_bound_millis": 1741402740000
},
"operations_behind": 1045828,
"changes_last_detected_at": 1741402800397,
"last_search_time": 1741402803398
},
"health": {
"status": "red",
"issues": [
{
"type": "transform_task_failed",
"issue": "Transform task state is [failed]",
"details": "task encountered more than 10 failures; latest failure: Search rejected due to missing shards [[netflow-read-2025-3-8][0]]. Consider using `allow_partial_search_results` setting to bypass this error.",
"count": 1,
"first_occurrence": 1741404854917
}
]
}
}
]
}
I wanted to know if, for example, if the transformation stops and the source index continues to receive data, if when the transformation returns it can read all the pending data that was not read.
In general yes, it should be able to pick up where it left. Although there were cases where due to delays some data were missing. Increasing sync.time.delay
could help with that.
Could you also try:
- Increase
max_page_search_size
to500
or even1000
- Decrease frequency by changing the
frequency
setting from1s
to5s
?