Order of documents when using a transform

I'm using elasticsearch to store the events data for web sessions. Each event is it's own document. Then I'm using transforms to aggregate this data grouping them by a sessionId key and also creating a field eventFlow which contains the entire user journey. The problem I'm facing is that the events are not going through the transform in the order of their timestamps.

I'll share an example, for the session b3wtJf6y_MlzVYGHAABl, I have the following three events -

{event: newUserConnected, timestamp: Dec 13, 2022 @ 21:11:18.000},
{event: hover, timestamp: Dec 13, 2022 @ 21:14:06.000},
{event: disconnect, timestamp: Dec 13, 2022 @ 21:14:08.000}

But the transformed data for this sessionId has the eventFlow as -

newUserConnected -> disconnect -> hover

Shouldn't the transform be processing the data in the order of their timestamps? Or am I missing something here?

Also adding the pivot for the transform I'm using -

{
    "group_by": {
      "sessionId": {
        "terms": {
          "field": "sessionId"
        }
      }
    },
    "aggregations": {
      "eventFlow": {
        "scripted_metric": {
          "init_script": "state.allEvents = [];",
          "map_script": """
            state.allEvents.add(doc.event.value)
          """,
          "combine_script": "return state.allEvents",
          "reduce_script": """
            String eventFlow = ''; 
            for (state in states){ 
              for (a in state){
                eventFlow += a.toString() + ' -> '
              }
            } 
            return eventFlow
          """
        }
      }
    }
  }

That assumption is not possible for several reasons. The most important one: Elasticsearch is a distributed system, your data is stored on different nodes in different shards. The order in the result of your query can depend on timings. It is not deterministic which node answers first, which second. Therefore you can even get a different order for the same query.

For your scripted metric that means you have to keep the timestamp in map and use it to order the events in reduce.

2 Likes

Thanks for the insight, it really helped.
This is the eventFlow aggregation I used to finally achieve the event flow in the order of timestamps, please let me know if there is a simpler way of doing the same.

"eventFlow": {
        "scripted_metric": {
          "init_script": "state.events = [:];",
          "map_script": """
            state.events.put(doc.timestamp.value.getMillis().toString() , doc.event.value);
          """,
          "combine_script": "return state.events",
          "reduce_script": """
            String eventFlow = '';
            def dateArray = [];
            for (state in states){ 
              for (date in state.keySet()){
                dateArray.add(Long.parseLong(date));
              }
              dateArray.sort((a, b) -> a>b? 1: -1);
              for (date in dateArray){
                  eventFlow += state[date.toString()] + ' -> ';
              }
            } 
            int len = eventFlow.length();
            eventFlow = eventFlow.substring(0, len-4);
            return eventFlow;
          """
        }
      }

I think your script won't work across shards: Basically every shard sends a state object, so the first thing you need to do is to merge all states into one. However your script iterates over the states and sorts the events for 1 shard and than adds them to the global eventFlow string. I think if you experiment with an index that has more than 1 shard you will find examples where the event flow isn't properly sorted.

You 1st need to merge all your states into 1 joined map. See the following code snippet (it uses streams instead of a for loop, but that's just syntactic sugar :slight_smile: ):

            def joinedMap = states.stream().flatMap(
              s -> s.entrySet().stream())
              .collect(Collectors.toMap
              (e-> Long.parseLong(e.getKey()),
              Map.Entry::getValue
              ));

Now you have 1 map that consists of keys which are timestamps (as epoch longs) and the value is your event.

This isn't sorted yet, I suggest to sort the stream, which avoids the extra variable. For joining the string you can use Collectors.joining:

            return joinedMap.entrySet().stream().sorted(
              (o1, o2) -> o1.getKey().compareTo(o2.getKey()))
              .map(d -> d.getValue()).collect(Collectors.joining(" -> ")); 
1 Like

You're right. I hadn't used any index with multiple shards so far as we're still in the dev stage. But I tested it out by creating an index with 3 shards, filling it with some dummy data, and running the transform over it - and most of the eventFlows had the wrong order. Using your code in the reduce script section again brought everything back to normal.
I'm also not familiar with a lot of the syntactic sugar that you've used here (primarily because I haven't worked much with Java), but I'll look into all the functions that you've used.
Thanks a lot, mate!
Posting the final eventFlow aggregation for any future readers -

"eventFlow": {
        "scripted_metric": {
          "init_script": "state.events = [:];",
          "map_script": """
          def event = '';
          if(!doc.element.empty){
            event = doc.event.value + " on " + doc.element.value + " with value " + doc.label.value;
          }
          else{
            event = doc.event.value ;
          }
            state.events.put(doc.timestamp.value.getMillis().toString() , event);
          """,
          "combine_script": "return state.events",
          "reduce_script": """
            def joinedMap = states.stream().flatMap(
              s -> s.entrySet().stream())
              .collect(Collectors.toMap
              (e-> Long.parseLong(e.getKey()),
              Map.Entry::getValue
              ));
            return joinedMap.entrySet().stream().sorted(
              (o1, o2) -> o1.getKey().compareTo(o2.getKey()))
              .map(d -> d.getValue()).collect(Collectors.joining(" -> "));
          """
        }
      },

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.