Re-indexing an aggregation for later use

I am performing an aggregation on our company daily data stream in Elastic. I am bucketing the data by an "mid: field, and summing the "amount" field in the payload. It looks like this:

{
   "aggs": {
        "tpv": {
            "terms": {
                "field": "payload.mid",
                 "order" : { "total_volume" : "desc" }
            },
            "aggs": { 
                "total_volume": {
                   "sum": {
                        "field": "payload.usd_amt"
                    }
                }
            }
        }
    }
} 

When I run the above aggregation, I see response data like this:

{
"_id": "cal_tpv_agg_watch_0-2016-03-21T13:20:56.372Z",
"result": {
"execution_time": "2016-03-21T13:20:56.372Z",
"execution_duration": 20,
"input": {
"aggregations": {
"tpv": {
"buckets": [
{
"doc_count": 146,
"total_volume": {
"value": 1559432
},
"key": "12347"
},
{
"doc_count": 120,
"total_volume": {
"value": 1239380
},
"key": "12352"
}
]
}

I use an extract statement to put "aggregations.tpv.buckets" into ctx.payload.

I'd like to save the daily aggregation buckets data to another index (so that later we can roll up the data into weekly, monthly, or yearly amounts). I am doing this by using an index action to put the aggregation into another index.

Does anyone have an example of transforming the "ctx.payload.tpv.buckets" data to the payload "_doc" field to take advantage of multi-document indexing specified in Actions - Multi-doc support

What's the best way of re-indexing aggregated data for later use?
Thanks, beckerdo

Has anyone transformed an aggregation to a multi-document suitable input for an index action?

I have the same issue. I'm trying to use a Watcher that uses aggregation, but I want the results to be stored in a new Index as a multi-document. Are there examples how to do this?

Hey folks,

take this example for further testing

First, let's bulk index some docs

PUT /foo/bar/_bulk
{ "index" : { "_id" : "1" } }
{ "foo" : "bar" }
{ "index" : { "_id" : "2" } }
{ "foo" : "bar" }
{ "index" : { "_id" : "3" } }
{ "foo" : "baz" }
{ "index" : { "_id" : "4" } }
{ "foo" : "spam" }
{ "index" : { "_id" : "5" } }
{ "foo" : "spam" }
{ "index" : { "_id" : "6" } }
{ "foo" : "spam" }

After refresh, we should be able to search those and aggregate on them

GET /foo/bar/_search
{
  "size": 0,
  "aggs": {
    "the_foos": {
      "terms": {
        "field": "foo",
        "size": 10
      }
    }
  }
}

Let's get a watch up and running

PUT _watcher/watch/transform
{
  "input": {
    "search": {
      "request": {
        "indices": [
          "foo"
        ],
        "types": [
          "bar"
        ],
        "body": {
          "size": 0,
          "aggs": {
            "the_foos": {
              "terms": {
                "field": "foo",
                "size": 10
              }
            }
          }
        }
      }
    }
  },
  "trigger": {
    "schedule": {
      "interval": "1h"
    }
  },
  "actions": {
    "index_payload": {
      "transform": {
        "script": "return [ _doc : ctx.payload.aggregations.the_foos.buckets ]"
      },
      "index": {
        "index": "my-index",
        "doc_type": "my-type"
      }
    }
  }
}

No need to wait, execute!

POST _watcher/watch/transform/_execute

Knowing we ran the watch, let's check the index for new documents!

GET my-index/my-type/_search

On my 2.2.1 test installation this showed three documents... of course you can change the documents in your script transform to whatever you want, but this should be a start.

ID's are generated automatically here.

Hope this helps!

--Alex

1 Like

Brilliant. This worked perfectly for me. And I was able to write a second aggregation using data from the first aggregation.

Is there a way to do the transform (from buckets to _doc) without the script? (For those servers that don't allow scripting.)

Thanks, Dan

Hey Dan,

not directly, but you can enable scripting specifically for watcher only by setting

script.engine.groovy.inline.elasticsearch-watcher_watch: on

Heads up: This setting is going to be renamed in 5.0.

--Alex

Alex...thank you. Your response on this thread was spot on and saved us. It would be helpful, if there was more documentation including examples online.

I have read through several Elasticserach ebooks, but I haven't come across any ebook that covers Watchers and Mult-docs.

-- Chad

not working for me with version 5.4

I copied and pasted provided example and got this error:

{
"error": {
"root_cause": [
{
"type": "general_script_exception",
"reason": "failed to compile script [ScriptException[compile error]; nested: IllegalArgumentException[Variable [_doc] is not defined.];]"
}
],
"type": "general_script_exception",
"reason": "failed to compile script [ScriptException[compile error]; nested: IllegalArgumentException[Variable [_doc] is not defined.];]"
},
"status": 500
}

Hey,

you can try putting _doc in ticks. Also, please open new threads for new issues.

Thanks a lot.

--Alex

ticks around _doc fixed it:

"transform": {
     "script": "return [ '_doc' : ctx.payload.aggregations.the_foos.buckets ]"
},

Thank you!