Can i use Transforms to collapse data?

I need continuously collapse data by hour (by custom field). And aggregated index must have equal source index mapping.
At first I wanted to use rollup, but it does not support scrolling and much more.

Hi,

this should work if you group_by date histogram and term and use index rollover for the source. Transform can not delete data for you but if you create e.g. daily indexes you can use ILM to delete source indexes that have been processed by transform. Note, this is an "optimistic" approach, ILM will delete your source index no matter whether transform has been run or not.

The aggregated index gets created with compatible mappings from the source index when you start the transform. For special needs or if mapping deduction is not possible (e.g. due to use of scripted_metric) you can create the destination index yourself with the mappings you want. Of course this mapping must be compatible with the aggregations you run otherwise the transform will fail.

I hope this answers the question, if not please describe the usecase in more detail.

But how i can forward unused in terms fields to dest index?
And why "missing" not work in transforms terms?

Regarding "missing": That's a currently not supported, see https://github.com/elastic/elasticsearch/issues/42941.

Can you give an example what you mean with "unused"?
If you know all field names, this should be doable using a scripted metric aggregation, see https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-scripted-metric-aggregation.html

    "aggregations": {
      "properties": {
        "scripted_metric": {
          "init_script": "state.join = new HashMap()",
          "map_script": "String[] fields = new String[] {'field_a', 'field_b', 'field_c'}; for (e in fields) { if (doc.containsKey(e)) {state.join.put(e, doc[e])}}",
          "combine_script": "return state.join",
          "reduce_script": "String[] fields = new String[] {'field_a', 'field_b', 'field_c'}; Map j=new HashMap(); for (s in states) {for (e in fields) { if (s.containsKey(e)) {j.put(e, s[e].get(0))}}} return j;"
        }
      }

If i use tranforms for aggregate documents, i cant save not used fields in group_by to dest index.
PUT _data_frame/transforms/testconc
{
"source": {
"index": "from"
},
"pivot": {
"group_by": {
"period": {
"terms": {
"field": "period"
}
},
"field_a": {
"terms": {
"field": "my_field_a",
}
}
},
"aggregations": {
"u": {
"sum": {
"field": "u"
}
}
}
},
"dest": {
"index": "trindex"
},
"frequency": "5m",
"sync": {
"time": {
"field": "createdAt",
"delay": "30s"
}
}
}

But document have my_field_b (can not exists) and i want save it to destination index.
my_field_b from any document from grouped bucket.
for example, the existence of aggregation "first"/"any" would help me

This should be do-able via scripted metric:

      "scripted_metric": {
          "init_script": "state.b = new String()",
          "map_script": "state.b = doc['field_b']",
          "combine_script": "return state.b",
          "reduce_script": "return states.get(0).get(0)"
        }
      }

I should take the 1st value for field_b, I admit this isn't super user friendly.

Thank you, I have already reached this option.
Tell me how 5+ of these fields will affect performance?

The impact on performance is low, because the script is compiled into byte code. The execution of the scripts are as fast as compiled java code. Painless is not interpreted at runtime.

But compiled byte-code execute for every document. It no slow?

Every aggregation has to execute code on the matched documents, whether you execute just 1 or n aggregations. Loading the matched document(s) is much more expensive than a small operation executed on them. IO might be required to load the docs, this is much slower than some simple computation.

IO is also involved in a multi node setup when it comes to communication (network traffic) between nodes. The place of execution is what makes aggregations fast. As much as possible is executed locally on the node that holds the data, only the final reduce phase is executed on the coordinating node. So in our example, the init, map and combine step is done on the node that holds the shard and only the reduce script is executed on the coordinating node (the node that runs the transform task).

Say you have 100 documents that collapse into 1 bucket, you are not sending 100 documents over the network but max the number of shards in your source index(es).

1 Like

thanks

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.