I want to calculate the indexing speed of documents in primary shards in my data streams.
When monitoring is enabled, I can see the index rate for each index. But I can't do this for any data stream from my clusters. I can calculate the necessary data in a third-party application, for example, grafana or prometheus, but I want to shift the calculation load to elasticsearch and extract clean data.
The information I need is located in the datastream .ds-.monitoring-es-8-mb-*
I want to make a permanent transform for the fields index_stats.primaries.indexing.index_total, elasticsearch.cluster.name, index_stats.index and bring the cluster name, datastream name, timestamp and indexing speed to the new index.
Please look at my logic and errors. Maybe someone has already done something similar or used another solution to write indexing speed to a new index (or datastream) It looks like I'm trying to do something wrong, or trying to operate with a service hidden index that is not available for further work by elasticsearch logic.
My code snippet:
PUT _transform/datastream_indexing_rate_transform
{
"source": {
"index": ".monitoring-es-8-mb-*",
"query": {
"bool": {
"filter": [
{ "exists": { "field": "index_stats.primaries.indexing.index_total" } },
{ "exists": { "field": "elasticsearch.cluster.name" } },
{ "exists": { "field": "index_stats.index" } }
]
}
}
},
"dest": {
"index": "datastream_indexing_rate_summary"
},
"pivot": {
"group_by": {
"@timestamp": {
"date_histogram": {
"field": "@timestamp",
"fixed_interval": "5s"
}
},
"cluster_name": {
"terms": {
"field": "elasticsearch.cluster.name"
}
},
"datastream": {
"terms": {
"script": {
"source": """
def m = /(.+)-\\d{4}\\.\\d{2}\\.\\d{2}-\\d+$/.matcher(doc['index_stats.index'].value);
if (m.matches()) {
return m.group(1);
} else {
return doc['index_stats.index'].value;
}
""",
"lang": "painless"
}
}
}
},
"aggregations": {
"total_indexed": {
"max": {
"field": "index_stats.primaries.indexing.index_total"
}
},
"indexing_rate": {
"derivative": {
"buckets_path": "total_indexed",
"unit": "1s"
}
}
}
},
"sync": {
"time": {
"field": "@timestamp",
"delay": "30s"
}
},
"settings": {
"max_page_search_size": 5000
}
}
В этом месте я получаю следующую ошибку
{
"error": {
"root_cause": [
{
"type": "validation_exception",
"reason": "Validation Failed: 1: Failed to test query, received status: SERVICE_UNAVAILABLE;"
}
],
"type": "validation_exception",
"reason": "Validation Failed: 1: Failed to test query, received status: SERVICE_UNAVAILABLE;",
"caused_by": {
"type": "action_request_validation_exception",
"reason": "Validation Failed: 1: derivative aggregation [indexing_rate] must have a histogram, date_histogram or auto_date_histogram as parent;"
}
},
"status": 400
}
ps: for my tests I'm using a Elasticsearch v. 8.14