Pivot transform copying values

I'm attempting my first pivot transform with a goal of calculating duration between timestamps. In the group_by clause, it results in a two documents, each with a timestamp. I have been able to calculate the duration using a min and max aggregation, along with a bucket_script like this:

  "pivot": {
    "group_by": { 
      "linkId": { "terms": { "field": "linkId" }},
      "string_metadata_hash": { "terms": { "field": "string_metadata_hash" }}
    },
    "aggregations": {
      "start": {
        "min": {
          "field": "timestamp"
        }
      },
      "stop": {
        "max": {
          "field": "timestamp"
        }
      },
      "duration_sec": {
        "bucket_script": {
          "buckets_path": {
            "start": "start.value",
            "stop": "stop.value"
          },
          "script": "return (params.stop - params.start)/1000;"
        }
      }
    }
  }
}

The result documents contain duration calculation I want, and the aggregation fields, but nothing else:

 "preview" : [
    {
      "linkId" : "<uuid>",
      "string_metadata_hash" : <hash value>",
      "stop" : "2022-02-18T20:21:32.842Z",
      "start" : "2022-02-18T20:19:39.934Z",
      "duration_sec" : 112.908
    },
   ...
  ]

There are a number of other fields, nested within a sub-object metadata for example foo a string and bar an integer, and several others. How can I include them in the result? Note that they are guaranteed to be identical between the two documents I just need any copy of the values.

Do I need a second transformation, combining this destination index and the original source? Or are there aggregations I can use? Ideally it can copy the whole metadata without enumerating the individual fields. Or must I enumerate these fields in the group_by clause?

For the pivot function, the destination index output fields must either be specified in the group_by section or the aggregations section. It would be best to experiment against your local data as to which is most appropriate.

If using aggregations, then the Top metrics aggregation is likely what you'll need. Create transform API | Elasticsearch Guide [8.0] | Elastic

Hope this helps

I suppose adding Scripted metric to the transform is the possible way. You may create custom metric just retrieving the first document and discard the rest.
If you only contain numeric fields, Top Metrics aggregation could be another option while you need list up all fields needed.

Something like this:

"aggs":{
    "one_document":{
      "scripted_metric": {
        "init_script": "state.doc = new HashMap()",
        "map_script": "if (state.doc.isEmpty()){state.doc = new HashMap(params['_source'])}",
        "combine_script": "return state.doc",
        "reduce_script": "return states[0]"
      }
    }
  }

Top metrics does actually support keywords (naming is hard) and I suspect it is more performant than a scripted metric -- but it depends on your data and both should work.

Thank you for correcting my imprecise post. Top metrics could also be used for keywords fields.

@iamtheschmitzer
Supported field types for Top metrics aggregation is discribed here.

Thank you this did the job, but I added .metadata to the map_script

1 Like

I will look at that next, thank you

I attempted top_metrics but it gave me null results. Not sure if this is related, but the context-sensitive autocomplete did not provide top_metrics as a suggestion, only top_hits. Not sure if this is due to it being within a pivot transform or not.

I suppose top hits aggregation is not supported by pivot transform. Could you share the whole settings of that transform?

Sure, appreciate the help

  "source": {
    "index": "source-name"
  },
  "dest" : { 
    "index" : "dest-name"
  },
  "pivot": {
    "group_by": { 
      "linkId": { "terms": { "field": "linkId" }},
      "string_metadata_hash": { "terms": { "field": "string_metadata_hash" }},
    },
    "aggregations": {
      "metadata": {
        "scripted_metric": {
          "init_script": "state.doc = new HashMap()",
          "map_script": "if (state.doc.isEmpty()){state.doc = new HashMap(params['_source'].metadata)}",
          "combine_script": "return state.doc",
          "reduce_script": "return states[0]"
        }
      },
      "start": {
        "min": {
          "field": "timestamp"
        }
      },
      "complete": {
        "max": {
          "field": "timestamp"
        }
      },
      "duration_sec": {
        "bucket_script": {
          "buckets_path": {
            "start": "start.value",
            "complete": "complete.value"
          },
          "script": "return (params.complete - params.start)/1000;"
        }
      }
    }
  },
   "sync": {
    "time": { 
      "field": "timestamp",
      "delay": "60s"
    }
  }

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.