Concatenating array objects in elasticsearch transform aggregations

I'm currently using an Elasticsearch transform in order to "merge" similar documents. It works very well, however I've run into an issue attempting to concatenate fields that contain an array of objects.

For example,

If I have two documents in the form:

{
    "_source": {
        "name": "xxx",
        "values": [
            { 
                "label": "xxx-a",
                "otherA": 123,
                "otherB": "abc"
            }
        ]
    }
}

{
    "_source": {
        "name": "xxx",
        "values": [
            { 
                "label": "xxx-b",
                "otherA": 456,
                "otherB": "def"
            }
        ]
    }
}

With the following mappings:

{
    "properties": {
        "name": {
            "type": "keyword"
        },
        "values": {
            "properties": {
                "label": {
                    "type": "keyword"
                },
                "otherA": {
                    "type": "integer"
                },
                "otherB": {
                    "type": "keyword"
                }
            }
        }
    }
}

My ideal result of the transform would be a single document in the form:

{
    "name": "xxx",
    "values": [
        { 
            "label": "xxx-a",
            "otherA": 123,
            "otherB": "abc"
        },
        { 
            "label": "xxx-b",
            "otherA": 456,
            "otherB": "def"
        }
    ]
}

My currrent transform pivot is:

{
  "pivot": {
    "group_by": {
      "name": {
        "terms": {
          "field": "name"
        }
      }
    },
    "aggregations": {
      "temporary.valuesLabel": {
        "terms": {
          "field": "values.label"
        }
      },
      "temporary.valuesOtherA": {
        "terms": {
          "field": "values.otherA"
        }
      },
      "temporary.valuesOtherB": {
        "terms": {
          "field": "values.otherB"
        }
      }
    }
  }
}

I use a pipeline with the following script to attempt to re-assemble the objects:

ctx.values = !ctx.temporary.valuesLabel.keySet().isEmpty() ? ctx.temporary.valuesLabel.keySet().stream().map(label -> ['label': label]).collect(Collectors.toList()) : [];

List valuesOtherA = !ctx.temporary.valuesOtherA.keySet().isEmpty() ? ctx.temporary.valuesOtherA.keySet().stream().collect(Collectors.toList()) : [];
List valuesOtherB = !ctx.temporary.valuesOtherB.keySet().isEmpty() ? ctx.temporary.valuesOtherB.keySet().stream().collect(Collectors.toList()) : [];
for (int i=0; i<ctx.values.length; i++) {
  ctx.values[i].otherA = Int.parseInt(valuesOtherA.get(i));
  ctx.values[i].otherB = valuesOtherB.get(i);
}

This almost works, except for the issue that when I do the individual terms aggs for each field in the values array, the terms agg can result with "misaligned" buckets, resulting in "mixed" objects".

Is there a way to have all the bucket indices get a fixed ordering? Is there a better way to achieve what I am trying to do?

Thank you for your time.

I think you can do this easier with a scripted_metric. An advent post from 2 years ago discusses a similar case, unfortunately in german, but just use your favorite online translator:

In short you need something like:

"values": {
  "scripted_metric": {
    "init_script": "state.docs = []",
    "map_script": "state.docs.add(new HashMap(params['_source']['values']))",
    "combine_script": "return state.docs",
    "reduce_script": "def docs = []; for (s in states) {for (d in s) { docs.add(d);}}return docs"
  }
}

You probably need to tweak the map part a bit to e.g. not fail if value is missing in a doc.

1 Like

Thank you very much for the quick response. The scripted_metric works perfectly!

I needed to also remove duplicate entries (which existed in the data) so my final aggregation was:

"values": {
    "scripted_metric": {
        "init_script": "state.docs = []",
        "map_script": "def values = params['_source']['values'] != null ? new ArrayList(params['_source']['values']) : new ArrayList(); for (v in values) { state.docs.add(new HashMap(v)); }",
        "combine_script": "return state.docs",
        "reduce_script": "def dedupe = new HashSet(); def docs = []; for (s in states) {for (d in s) { if (!dedupe.contains(d)) { dedupe.add(d); docs.add(d); } } } return docs"
    }
}

Cheers

2 Likes

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.