Aggregate and query transactions from indexed events

Hey there,

currently I am indexing events from a message service in Elasticsearch. My aim is to aggregate those events to whole transactions and in addition query these transactions by several criteria.
This is where I am having troubles. I created the following very simplified example to illustrate my situation.

PUT test
        {
          "mappings": {
            "event": { 
              "properties": { 
                "timeStamp": {"type" : "long"},
                "eventId": {"type" : "keyword"},
                "eventType": {"type" : "keyword"},
                "transactionId": {"type" : "keyword"}
                }
              }
            }
        }

        PUT /test/event/e1
        {
            "timeStamp": 1535024097718,
            "eventId": "e1",
            "eventType": "MessageSubmitEvent",
            "specificInfo": {},
            "transactionId": "t1"
        }

        PUT /test/event/e2
        {
            "timeStamp": 1535024097800,
            "eventId": "e2",
            "eventType": "DeliveryStatusEvent",
            "specificInfo": {
              "deliverySuccesful": "false"
            },
            "transactionId": "t1"
        }

        PUT /test/event/e3
        {
            "timeStamp": 1535024099718,
            "eventId": "e1",
            "eventType": "MessageSubmitEvent",
            "specificInfo": {},
            "transactionId": "t2"
        }

        PUT /test/event/e4
        {
            "timeStamp": 1535024099800,
            "eventId": "e2",
            "eventType": "DeliveryStatusEvent",
            "specificInfo": {
              "deliverySuccesful": "true"
            },
            "transactionId": "t2"
        }

Now I would like to assemble t1 and t2 as well as query the result by succesful delivery.
I have come that far:

GET /test/event/_search
{
  "size":0,
  "aggs": {
    "group_by_transactionId": {
      "terms": {
        "field": "transactionId"
      },
      "aggs": {
        "eventType": {
          "terms": {
            "field": "eventType"
          },
          "aggs": {
           "deliveryStatus": {
            "terms": {
             "field": "specificInfo.deliverySuccesful.keyword"
          }
        }
      }
        }
      }
    }
  }
}

The result is:

{
  "took": 12,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 4,
    "max_score": 0,
    "hits": []
  },
  "aggregations": {
    "group_by_transactionId": {
      "doc_count_error_upper_bound": 0,
      "sum_other_doc_count": 0,
      "buckets": [
        {
          "key": "t1",
          "doc_count": 2,
          "eventType": {
            "doc_count_error_upper_bound": 0,
            "sum_other_doc_count": 0,
            "buckets": [
              {
                "key": "DeliveryStatusEvent",
                "doc_count": 1,
                "deliveryStatus": {
                  "doc_count_error_upper_bound": 0,
                  "sum_other_doc_count": 0,
                  "buckets": [
                    {
                      "key": "false",
                      "doc_count": 1
                    }
                  ]
                }
              },
              {
                "key": "MessageSubmitEvent",
                "doc_count": 1,
                "deliveryStatus": {
                  "doc_count_error_upper_bound": 0,
                  "sum_other_doc_count": 0,
                  "buckets": []
                }
              }
            ]
          }
        },
        {
          "key": "t2",
          "doc_count": 2,
          "eventType": {
            "doc_count_error_upper_bound": 0,
            "sum_other_doc_count": 0,
            "buckets": [
              {
                "key": "DeliveryStatusEvent",
                "doc_count": 1,
                "deliveryStatus": {
                  "doc_count_error_upper_bound": 0,
                  "sum_other_doc_count": 0,
                  "buckets": [
                    {
                      "key": "true",
                      "doc_count": 1
                    }
                  ]
                }
              },
              {
                "key": "MessageSubmitEvent",
                "doc_count": 1,
                "deliveryStatus": {
                  "doc_count_error_upper_bound": 0,
                  "sum_other_doc_count": 0,
                  "buckets": []
                }
              }
            ]
          }
        }
      ]
    }
  }
}

Can anyone help me with that? Is it actually possible? It seems like filtering supports mainly numeric values, which don't help me a lot here.
Thanks in advance for any kind of help!

Why this is hard on an "event-centric" log index and a solution that creates an entity-centric index from these logs: https://www.youtube.com/watch?v=yBf7oeJKH2Y

The video example solution is dated in that it talks about Groovy, not Painless, but the principles still hold. Example painless scripts that work with 6.3+ are here

Thanks for replying!
I didn't say it is hard if you do further scripting. I was hoping there would be an easy solution which I just didn't see.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.