Indexing rate for data streams

I want to calculate the indexing speed of documents in primary shards in my data streams.

When monitoring is enabled, I can see the index rate for each index. But I can't do this for any data stream from my clusters. I can calculate the necessary data in a third-party application, for example, grafana or prometheus, but I want to shift the calculation load to elasticsearch and extract clean data.

The information I need is located in the datastream .ds-.monitoring-es-8-mb-*
I want to make a permanent transform for the fields index_stats.primaries.indexing.index_total, elasticsearch.cluster.name, index_stats.index and bring the cluster name, datastream name, timestamp and indexing speed to the new index.

Please look at my logic and errors. Maybe someone has already done something similar or used another solution to write indexing speed to a new index (or datastream) It looks like I'm trying to do something wrong, or trying to operate with a service hidden index that is not available for further work by elasticsearch logic.
My code snippet:

PUT _transform/datastream_indexing_rate_transform
{
  "source": {
    "index": ".monitoring-es-8-mb-*",
    "query": {
      "bool": {
        "filter": [
          { "exists": { "field": "index_stats.primaries.indexing.index_total" } },
          { "exists": { "field": "elasticsearch.cluster.name" } },
          { "exists": { "field": "index_stats.index" } }
        ]
      }
    }
  },
  "dest": {
    "index": "datastream_indexing_rate_summary"
  },
  "pivot": {
    "group_by": {
      "@timestamp": {
        "date_histogram": {
          "field": "@timestamp",
          "fixed_interval": "5s"
        }
      },
      "cluster_name": {
        "terms": {
          "field": "elasticsearch.cluster.name"
        }
      },
      "datastream": {
        "terms": {
          "script": {
            "source": """
              def m = /(.+)-\\d{4}\\.\\d{2}\\.\\d{2}-\\d+$/.matcher(doc['index_stats.index'].value);
              if (m.matches()) {
                  return m.group(1);
              } else {
                  return doc['index_stats.index'].value;
              }
            """,
            "lang": "painless"
          }
        }
      }
    },
    "aggregations": {
      "total_indexed": {
        "max": {
          "field": "index_stats.primaries.indexing.index_total"
        }
      },
      "indexing_rate": {
        "derivative": {
          "buckets_path": "total_indexed",
          "unit": "1s"
        }
      }
    }
  },
  "sync": {
    "time": {
      "field": "@timestamp",
      "delay": "30s"
    }
  },
  "settings": {
    "max_page_search_size": 5000
  }
}

В этом месте я получаю следующую ошибку

{
  "error": {
    "root_cause": [
      {
        "type": "validation_exception",
        "reason": "Validation Failed: 1: Failed to test query, received status: SERVICE_UNAVAILABLE;"
      }
    ],
    "type": "validation_exception",
    "reason": "Validation Failed: 1: Failed to test query, received status: SERVICE_UNAVAILABLE;",
    "caused_by": {
      "type": "action_request_validation_exception",
      "reason": "Validation Failed: 1: derivative aggregation [indexing_rate] must have a histogram, date_histogram or auto_date_histogram as parent;"
    }
  },
  "status": 400
}

ps: for my tests I'm using a Elasticsearch v. 8.14

Transforms does not support derivative. Transforms uses composite aggregation which does not support most pipeline aggregations like derivative: Composite aggregation | Elasticsearch Guide [8.17] | Elastic

It's possible to recreate the rate using bucket_script, but it may not be exact, something like:

PUT _transform/datastream_indexing_rate_transform
{
  "source": {
    "index": ".monitoring-es-8-mb-*",
    "query": {
      "bool": {
        "filter": [
          { "exists": { "field": "index_stats.primaries.indexing.index_total" } },
          { "exists": { "field": "elasticsearch.cluster.name" } },
          { "exists": { "field": "index_stats.index" } }
        ]
      }
    }
  },
  "dest": {
    "index": "datastream_indexing_rate_summary"
  },
  "pivot": {
    "group_by": {
      "@timestamp": {
        "date_histogram": {
          "field": "@timestamp",
          "fixed_interval": "5s"
        }
      },
      "cluster_name": {
        "terms": {
          "field": "elasticsearch.cluster.name"
        }
      },
      "datastream": {
        "terms": {
          "script": {
            "source": """
              def m = /(.+)-\\d{4}\\.\\d{2}\\.\\d{2}-\\d+$/.matcher(doc['index_stats.index'].value);
              if (m.matches()) {
                  return m.group(1);
              } else {
                  return doc['index_stats.index'].value;
              }
            """,
            "lang": "painless"
          }
        }
      }
    },
    "aggregations": {
      "max_total": {
        "max": {
          "field": "index_stats.primaries.indexing.index_total"
        }
      },
      "min_total": {
        "min": {
          "field": "index_stats.primaries.indexing.index_total"
        }
      },
      "indexing_rate": {
        "buckets_path": {
          "max": "max_total",
          "min": "min_total"
        },
        "script": "params.max > params.min ? params.max - params.min : 0"
      }
    }
  },
  "sync": {
    "time": {
      "field": "@timestamp",
      "delay": "30s"
    }
  },
  "settings": {
    "max_page_search_size": 5000
  }
}

It's also worth noting that Transforms does not currently support writing to a datastream as its destination: Transform limitations | Elasticsearch Guide [8.17] | Elastic

If you wanted the Transform to write to a destination index that behaved like a Data Stream, you can either create the destination index before creating the Transform (and the Transform will automatically reuse it), or you can set a retention_policy when you create the Transform (which tells the Transform to delete the data after a set number of days): Create transform API | Elasticsearch Guide [8.17] | Elastic

Unfortunately I couldn't get it to work(((

First error - "Unknown aggregation type [buskets_path] did you mean any of [bucket_sort, bucket_script, bucket_selector]?"

If I set one of this, I get a different error: "Inknow filed [max]"

It should be buckets_path and not buskets_path: Pipeline aggregations | Elasticsearch Guide [8.17] | Elastic