I'm using elasticsearch to store the events data for web sessions. Each event is it's own document. Then I'm using transforms to aggregate this data grouping them by a sessionId key and also creating a field eventFlow which contains the entire user journey. The problem I'm facing is that the events are not going through the transform in the order of their timestamps.
I'll share an example, for the session b3wtJf6y_MlzVYGHAABl, I have the following three events -
{event: newUserConnected, timestamp: Dec 13, 2022 @ 21:11:18.000},
{event: hover, timestamp: Dec 13, 2022 @ 21:14:06.000},
{event: disconnect, timestamp: Dec 13, 2022 @ 21:14:08.000}
But the transformed data for this sessionId has the eventFlow as -
newUserConnected -> disconnect -> hover
Shouldn't the transform be processing the data in the order of their timestamps? Or am I missing something here?
Also adding the pivot for the transform I'm using -
That assumption is not possible for several reasons. The most important one: Elasticsearch is a distributed system, your data is stored on different nodes in different shards. The order in the result of your query can depend on timings. It is not deterministic which node answers first, which second. Therefore you can even get a different order for the same query.
For your scripted metric that means you have to keep the timestamp in map and use it to order the events in reduce.
Thanks for the insight, it really helped.
This is the eventFlow aggregation I used to finally achieve the event flow in the order of timestamps, please let me know if there is a simpler way of doing the same.
"eventFlow": {
"scripted_metric": {
"init_script": "state.events = [:];",
"map_script": """
state.events.put(doc.timestamp.value.getMillis().toString() , doc.event.value);
""",
"combine_script": "return state.events",
"reduce_script": """
String eventFlow = '';
def dateArray = [];
for (state in states){
for (date in state.keySet()){
dateArray.add(Long.parseLong(date));
}
dateArray.sort((a, b) -> a>b? 1: -1);
for (date in dateArray){
eventFlow += state[date.toString()] + ' -> ';
}
}
int len = eventFlow.length();
eventFlow = eventFlow.substring(0, len-4);
return eventFlow;
"""
}
}
I think your script won't work across shards: Basically every shard sends a state object, so the first thing you need to do is to merge all states into one. However your script iterates over the states and sorts the events for 1 shard and than adds them to the global eventFlow string. I think if you experiment with an index that has more than 1 shard you will find examples where the event flow isn't properly sorted.
You 1st need to merge all your states into 1 joined map. See the following code snippet (it uses streams instead of a for loop, but that's just syntactic sugar ):
You're right. I hadn't used any index with multiple shards so far as we're still in the dev stage. But I tested it out by creating an index with 3 shards, filling it with some dummy data, and running the transform over it - and most of the eventFlows had the wrong order. Using your code in the reduce script section again brought everything back to normal.
I'm also not familiar with a lot of the syntactic sugar that you've used here (primarily because I haven't worked much with Java), but I'll look into all the functions that you've used.
Thanks a lot, mate!
Posting the final eventFlow aggregation for any future readers -
Apache, Apache Lucene, Apache Hadoop, Hadoop, HDFS and the yellow elephant
logo are trademarks of the
Apache Software Foundation
in the United States and/or other countries.