Elasticsearch: transpose and aggregate?

I am using the ES 6.5. When I fetch the required messages, I have to transpose and aggregate it. See example for more details.

Message retrieved - 2 messages retried for example:

{
    "_index": "index_name",
    "_type": "data",
    "_id": "data_id",
    "_score": 5.0851293,
    "_source": {
        "header": {
            "id": "System_20190729152502239_57246_16667",
            "creationTimestamp": "2019-07-29T15:25:02.239Z",
        },
        "messageData": {
            "messageHeader": {
                "date": "2019-06-03",
                "mId": "1000",
                "mDescription": "TEST",
            },
            "messageBreakDown": [
                {
                    "category": "New",
                    "subCategory": "Sub",
                    "messageDetails": [
                        {
                            "Amount": 5.30
                        }
                    ]
                }
            ]
        }
    }
},
{
    "_index": "index_name",
    "_type": "data",
    "_id": "data_id",
    "_score": 5.09512,
    "_source": {
        "header": {
            "id": "System_20190729152502239_57246_16667",
            "creationTimestamp": "2019-07-29T15:25:02.239Z",
        },
        "messageData": {
            "messageHeader": {
                "date": "2019-06-03",
                "mId": "1000",
                "mDescription": "TEST",
            },
            "messageBreakDown": [
                {
                    "category": "Old",
                    "subCategory": "Sub",
                    "messageDetails": [
                        {
                            "Amount": 4.30
                        }
                    ]
                }
            ]
        }
    }
}

Now I am looking for a query to post on ES which will transpose the data and group by on category and sub category .

Data output

So basically if you check the messages, they have same header.id (which is the main search criteria). Within this header.id, one message is for category New and other Old (messageData.messageBreakDown is array and in it category value).

So ideally as you see the output, both messages belong to same mId, and it has New price and Old Price.

  • How to aggregate for the desired results ?
  • Final output message can have desired fields only e.g. date, mId, mDesciption, New price and Old price (both in one output)?

Below is the mapping:

{"index_name":{"mappings":{"data":{"properties":{"header":{"properties":{"id":{"type":"text","fields":{"keyword":{"type":"keyword","ignore_above":256}}},"creationTimestamp":{"type":"date"}}},"messageData":{"properties":{"messageBreakDown":{"properties":{"category":{"type":"text","fields":{"keyword":{"type":"keyword","ignore_above":256}}},"messageDetails":{"properties":{"Amount":{"type":"float"}}},"subCategory":{"type":"text","fields":{"keyword":{"type":"keyword","ignore_above":256}}}}},"messageHeader":{"properties":{"mDescription":{"type":"text","fields":{"keyword":{"type":"keyword","ignore_above":256}}},"mId":{"type":"text","fields":{"keyword":{"type":"keyword","ignore_above":256}}},"date":{"type":"date"}}}}}}}}}}

Something like the following aggregation request should do the trick:

GET index_name/_search
{
  "size": 0,
  "aggs": {
    "HeaderIds": {
      "terms": {
        "field": "header.id.keyword",
        "size": 10
      },
      "aggs": {
        "mIds": {
          "terms": {
            "field": "messageData.messageHeader.mId.keyword",
            "size": 10
          },
          "aggs": {
            "Date": {
              "max": {
                "field": "messageData.messageHeader.date"
              }
            },
            "New": {
              "filter": {
                "match": {
                  "messageData.messageBreakDown.category": "New"
                }
              },
              "aggs": {
                "Amount": {
                  "max": {
                    "field": "messageData.messageBreakDown.messageDetails.Amount"
                  }
                }
              }
            },
            "Old": {
              "filter": {
                "match": {
                  "messageData.messageBreakDown.category": "Old"
                }
              },
              "aggs": {
                "Amount": {
                  "max": {
                    "field": "messageData.messageBreakDown.messageDetails.Amount"
                  }
                }
              }
            }
          }
        }
      }
    }
  }
}

Note that the terms aggregation will only return up to 10 header.ids and mIds. If you have a lot of different header.ids you may want to wrap the terms aggregation in a composite aggregation.

Thanks, this helped me with my sample data. I am trying to apply to my more complex data, and facing the issue.

So bigger data set:

{
                "_index": "indexName",
                "_type": "data",
                "_id": "System_20190802215202794_44393_193_000000005",
                "_score": 14.978437,
                "_source": {
                    "header": {
                        "minorId": "System_20190802215202794_44393_193_000000005",
                        "mainId": "System_20190802215202794_44393_193",
                        "sourceSystemCreationTimestamp": "2019-08-02T21:52:02.794Z"
                    },
                    "messageData": {
                        "messageHeader": {
                            "messageType": "Daily",
                            "businessDate": "2019-02-25",
                            "mId": "1111",
                            "mDescription": "TEST"
                        },
                        "messageBreakDown": [
                            {
                                "currency": "eur",
                                "messageDetails": [
                                    {
                                        "localAmount": 10,
                                        "convertedAmount": 11.05
                                    }
                                ]
                            },
                            {
                                "currency": "aud",
                                "messageDetails": [
                                    {
                                        "localAmount": 10,
                                        "convertedAmount": 6.87
                                    }
                                ]
                            }
                        ]
                    }
                }
},
{
                "_index": "indexName",
                "_type": "data",
                "_id": "System_20190802215202794_44393_193_000000006",
                "_score": 14.978437,
                "_source": {
                    "header": {
                        "minorId": "System_20190802215202794_44393_193_000000006",
                        "mainId": "System_20190802215202794_44393_193",
                        "sourceSystemCreationTimestamp": "2019-08-02T21:52:02.794Z"
                    },
                    "messageData": {
                        "messageHeader": {
                            "messageType": "Monthly",
                            "businessDate": "2019-02-25",
                            "mId": "1111",
                            "mDescription": "TEST"
                        },
                        "messageBreakDown": [
                            {
                                "currency": "eur",
                                "messageDetails": [
                                    {
                                        "localAmount": 15,
                                        "convertedAmount": 16.57
                                    }
                                ]
                            },
                            {
                                "currency": "aud",
                                "messageDetails": [
                                    {
                                        "localAmount": 15,
                                        "convertedAmount": 10.30
                                    }
                                ]
                            }
                        ]
                    }
                }
}

Basically, I am looking to aggregate and sum the data based on below:
Aggregate:

  • businessDate (2019-02-25)
  • mId (1111)
  • currency (eur and aud)

Sum:

  • localAmount (Daily and Monthly)
  • convertedAmount (Daily and Monthly)

Basically, above 2 messages (belong to same mId and date) should retrieve 2 rows (as we have currencies aud and eur) and their corresponsing localAmount and convertedAmount.

image date mID currency dailyLocal dailyConverted MonthlyLocal MonthlyConverted
25/02/2019 1111 eur 10 11.05 15 16.57
25/02/2019 1111 aud 10 6.87 15 10.3

So, I tried to use the above where I am first aggregating based on businessDate, mID, currency and then I am creating filters for dailyLocal, dailyConverted, monthlyLocal and monthlyConverted.

Query:

"size": 0,
"aggs": {
"businessDate": {
"terms": {
"field": "messageData.messageHeader.businessDate",
"size": 10
},
"aggs": {
"mId": {
"terms": {
"field": "messageData.messageHeader.mId.keyword",
"size": 10
},
"aggs": {
"Currency": {
"terms": {
"field": "messageData.messageBreakDown.currency.keyword",
"size": 10
}
},
"dailyLocal" : {
"filter": {
"match": { "messageData.messageData.messageType": "Daily" }
},
"aggs" : {
"Local": {
"sum" : {
"field": "messageData.messageBreakDown.messageDetails.localAmount"
}
}
}
},
"dailyConverted" : {
"filter": {
"match": { "messageData.messageData.messageType": "Daily" }
},
"aggs" : {
"Converted": {
"sum" : {
"field": "messageData.messageBreakDown.messageDetails.convertedAmount"
}
}
}
},
"monthlyLocal" : {
"filter": {
"match": { "messageData.messageData.messageType": "Monthly" }
},
"aggs" : {
"Local": {
"sum" : {
"field": "messageData.messageBreakDown.messageDetails.localAmount"
}
}
}
},
"monthlyConverted" : {
"filter": {
"match": { "messageData.messageData.messageType": "Monthly" }
},
"aggs" : {
"Converted": {
"sum" : {
"field": "messageData.messageBreakDown.messageDetails.convertedAmount"
}
}
}
}
}
}
}
}
}

But I am not getting the desired output. (See example below)

The sum and currency filters are not in desired level. It seems like it is summing everything (irrespective of filters). In fact the sum should come under each currency but not getting it that way.
So how should I sum based on currency too i.e. lowest level granularity should be currency?

"aggregations": {
"businessDate": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": 1551052800000,
"key_as_string": "2019-02-25T00:00:00.000Z",
"doc_count": 10,
"mId": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": "1111",
"doc_count": 10,
"dailyLocal": {
"doc_count": 2,
"Local": {
"value": 50
}
},
"monthlyLocal": {
"doc_count": 2,
"Local": {
"value": 50
}
},"dailyConverted": {
"doc_count": 2,
"Local": {
"value": 44.79
}
},
"monthlyConverted": {
"doc_count": 2,
"Local": {
"value": 44.79
}
},
"Currency": {
"doc_count_error_upper_bound": 2,
"sum_other_doc_count": 150,
"buckets": [
{
"key": "aud",
"doc_count": 2
},
{
"key": "eur",
"doc_count": 2
}
]
}
}
]
}
}
]
}
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.