How to aggregate data on elastic sent by logstash

Hi,

you should be able to do this using transform (see https://www.elastic.co/guide/en/elasticsearch/reference/7.4/put-transform.html). The workflow would look like this:

  1. you index the data into elasticsearch as single docs, this is the "source" index
  2. you create a transform that pulls data from the "source" and aggregates it according to your needs into a "dest" index.

In your case you would group by entity Id, firm id, etc. and define aggregations, e.g. lastUpdated would be a max aggregation. Combining the docs is more tricky but doable using a scripted metric aggregation:

"associatedHouseholds": {
        "scripted_metric": {
          "init_script": "state.docs = []",
          "map_script": "state.docs.add(new HashMap(params['_source']))",
          "combine_script": "return state.docs",
          "reduce_script": "def ret = []; for (s in states) {for (d in s) { ret.add(d);}}return ret"
        }
      }

In order to do this as data comes in you need to sync source and dest via a timestamp field, it seems like lastUpdated is also the ingest timestamp and can therefore be used for this.

Note that you can instead of writing to an index write to a pipeline, this enables you to do further operations on the aggregated data.

I hope this helps!

2 Likes