Hi,
you should be able to do this using transform (see https://www.elastic.co/guide/en/elasticsearch/reference/7.4/put-transform.html). The workflow would look like this:
- you index the data into elasticsearch as single docs, this is the "source" index
- you create a transform that pulls data from the "source" and aggregates it according to your needs into a "dest" index.
In your case you would group by entity Id, firm id, etc. and define aggregations, e.g. lastUpdated would be a max aggregation. Combining the docs is more tricky but doable using a scripted metric aggregation:
"associatedHouseholds": {
"scripted_metric": {
"init_script": "state.docs = []",
"map_script": "state.docs.add(new HashMap(params['_source']))",
"combine_script": "return state.docs",
"reduce_script": "def ret = []; for (s in states) {for (d in s) { ret.add(d);}}return ret"
}
}
In order to do this as data comes in you need to sync source and dest via a timestamp field, it seems like lastUpdated is also the ingest timestamp and can therefore be used for this.
Note that you can instead of writing to an index write to a pipeline, this enables you to do further operations on the aggregated data.
I hope this helps!