I'm currently running multiple transforms on the same source index. The source index is an ILM with a lifecycle policy, which shrinks the data after 30 days.
These are my transform stats for one of my transforms:
{
"count": 1,
"transforms": [
{
"stats": {
"pages_processed": 123799,
"documents_processed": 677763906,
"documents_indexed": 17370454,
"documents_deleted": 0,
"trigger_count": 58542,
"index_time_in_ms": 10020554,
"index_total": 35767,
"index_failures": 0,
"search_time_in_ms": 590799065,
"search_total": 123799,
"search_failures": 1,
"processing_time_in_ms": 379578,
"processing_total": 123799,
"delete_time_in_ms": 109048,
"exponential_avg_checkpoint_duration_ms": 33600.78815398271,
"exponential_avg_documents_indexed": 11158.586744618027,
"exponential_avg_documents_processed": 686544.2523900922
},
"checkpointing": {
"last": {
"checkpoint": 22545,
"timestamp_millis": 1677029132567,
"time_upper_bound_millis": 1677028532567
},
"operations_behind": 3220475,
"changes_last_detected_at": 1677029131139,
"last_search_time": 1677079799066
}
}
]
}
As you can see the search time is incredibly large. This is true for all my transforms that run on this specific source data. I have a monthly, quarterly and yearly transform and a latest that use the same source.
How can I improve performance? Do I need to increase shard size for my source data? Due to the lifecycle policy the data is shrunk to 1 primary shard and 1 secondary shard. Site users can still request this shrunk data, although it doesn't happen often.
An example of my yearly transform (my quarterly transform follows the same inputs):
value={
"id": "sales_last_year_transform",
"source": {
"index": [
"daily_sales"
],
"query": {
"bool": {
"filter": [
{
"range": {
"date": {
"gte": "now-365d",
"lt": "now"
}
}
}
]
}
}
},
"dest": {
"index": "sales_last_year"
},
"sync": {
"time": {
"field": "date",
"delay": "10m"
}
},
"pivot": {
"group_by": {
"product_id": {
"terms": {
"field": "product.id"
}
}
},
"aggregations": {
"price": {
"percentiles": {
"field": "offer.price",
"percents": [
50
]
}
},
"sales": {
"sum": {
"field": "sales"
}
},
"revenue": {
"bucket_script": {
"buckets_path": {
"sales": "sales",
"price": "price.50"
},
"script": "params.sales * params.price"
}
}
}
},
"settings": {
"max_page_search_size": 5000
},
"retention_policy": {
"time": {
"field": "date",
"max_age": "365d"
}
}
}
And here is an example of my monthly transform, which is a little bit more expensive:
value={
"source": {
"index": [
"daily_sales"
],
"query": {
"bool": {
"filter": [
{
"range": {
"date": {
"gte": "now-30d",
"lt": "now"
}
}
}
]
}
}
},
"dest": {
"index": "aggregated_sales_last_month"
},
"sync": {
"time": {
"field": "date",
"delay": "10m"
}
},
"pivot": {
"group_by": {
"product_id": {
"terms": {
"field": "product.id"
}
}
},
"aggregations": {
"latest_doc": {
"scripted_metric": {
"init_script": "state.timestamp_latest = 0L; state.last_doc = ''",
"map_script": "def current_date = doc['date'].getValue().toInstant().toEpochMilli();
if (current_date > state.timestamp_latest)
{
state.timestamp_latest = current_date;
state.last_doc = new HashMap(params['_source']['product']);
}",
"combine_script": "return state",
"reduce_script": "
def last_doc = '';
def timestamp_latest = 0L;
for (s in states) {if (s.timestamp_latest > (timestamp_latest)) {
timestamp_latest = s.timestamp_latest; last_doc = s.last_doc;}}
return last_doc"
}
},
"sellers": {
"scripted_metric": {
"init_script": "state.sellers = new HashSet();",
"map_script": "
if (doc.containsKey('offer.seller.name') && !doc['offer.seller.name'].empty) {
state.sellers.add(doc['offer.seller.name'].getValue());
}",
"combine_script": "return state",
"reduce_script": "
Set temp = new HashSet();
for (s in states) {
temp.addAll(s.sellers);
}
return temp;"
}
},
"price": {
"percentiles": {
"field": "offer.price",
"percents": [
50
]
}
},
"delivery_time": {
"percentiles": {
"field": "offer.promised_delivery_days",
"percents": [
50
]
}
},
"sales": {
"sum": {
"field": "sales"
}
},
"revenue": {
"bucket_script": {
"buckets_path": {
"sales": "sales",
"price": "price.50"
},
"script": "params.sales * params.price"
}
}
}
},
"settings": {
"max_page_search_size": 500
},
"retention_policy": {
"time": {
"field": "date",
"max_age": "30d"
}
}
}