Daily schedule transform from different indices

Hello,

We are currently ingesting data to an Elasticsearch cluster using Filebeat. We have configured Filebeat to create a new index every day, so the index template has the form index_{+yyyymmdd}.

We want to schedule a transform every end of the day to summarize the complete date information into a summary_index. We want to generate an aggregation that summarizes the sum of a field per hour.

Example:
Input first transform
index_day_1

|   Timestamp   | userID | field
| day1T00:20:00 | user_a | 10
| day1T00:25:00 | user_b | 20
| day1T00:45:00 | user_a | 30
| day1T00:50:00 | user_b | 50

Input second transform
index_day_2

|   Timestamp   | userID | field
| day2T00:10:00 | user_a | 1
| day2T01:25:00 | user_b | 2
| day2T01:45:00 | user_a | 3
| day2T01:50:00 | user_b | 5

Transformation Output after two days -> summary_index

|   day&hour    | userID | SUM(field)
| day1T00:00:00 | user_a | 40
| day1T00:00:00 | user_b | 70
| day2T00:00:00 | user_a | 1
| day2T01:00:00 | user_a | 3
| day2T01:00:00 | user_b | 7

Please notice that we aggregate data per hour, day, and userID. We are having trouble finding the righ tools for this task. We were wondering if there is a way to perform this aggregation within Elasticsearch and not setting up an external cron.

We have checked Watcher and Transformations' documentation, but we have not found the correct parameters to configure a periodic task like this (if these parameters exist).

Thanks in advance for any insights you could give us.

1 Like

Hi!
There are 2 kinds of transforms we currently support:

  1. batch transform: starts, processes given data and finishes
  2. continuous transform: starts, processes existing data, then periodically polling for changes in the source data and updating the destination index

If you go with 1. then you need to create and start the transform externally, using your own "cron" mechanism.
It seems that what you need is 2. (i.e.: the continuous transform). The following request gives exactly the results you provided:

GET _transform/_preview
{
  "source": {
    "index": "index_day*"
  },
  "dest": {
    "index": "summary_index"
  },
  "pivot": {
    "group_by": {
      "day_and_hour": {
        "date_histogram": {
          "field": "Timestamp",
          "fixed_interval": "1h"
        }
      },
      "userID": {
        "terms": {
          "field": "userID"
        }
      }
    },
    "aggregations": {
      "sum_field": {
        "sum": {
          "field": "field"
        }
      }
    }
  },
  "sync": {
    "time": {
      "field": "Timestamp",
      "delay": "1m"
    }
  },
  "frequency": "1h"
}

Once you are happy with your transform config and the preview you get, you can create such a transform, start it, and it will just constantly be updating the summary index.

Hi!

Thank you for your answer. It seems to be an appropriate solution for the problem we are trying to solve. Our first approach consisted of calculating, at the end of the day, the aggregated values for each hour, running the operation for the complete day, however, you propose to compute the aggregations continuously every hour which seems to satisfy the same function running continously. I would like to ask you a few questions about your answer.

As far as I understand, the configuration that you presented computes the aggregation every hour, since the parameter frequency is set to 1h. Does this mean that by the end of each hour the transformation will compute the aggregation considering only the new documents that were received in the interval (now() - 1h, now()] or it will check again every document corresponding to the index pattern? The date_histogram aggregation separates also the days (hour_1Tdat_1 and hour_1Tday_2 are different buckets)?

Could you give us some insights about the meaning of the sync object? Particularly the delay field.

Thank you!

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.