You can answer this type of question with aggregations, or the new transforms feature in Elasticsearch (if the data is large and continuous).
For example,
PUT spans
{
"mappings": {
"properties": {
"timestamp": { "type": "date" },
"action": { "type": "keyword" },
"userId": { "type": "keyword" }
}
}
}
POST _bulk
{"index":{"_index":"spans"}}
{"timestamp":"2019-10-14T08:34:03","action":"A","userId":"userA"}
{"index":{"_index":"spans"}}
{"timestamp":"2019-10-14T11:39:26","action":"A","userId":"userB"}
{"index":{"_index":"spans"}}
{"timestamp":"2019-10-14T11:41:23","action":"A","userId":"userA"}
{"index":{"_index":"spans"}}
{"timestamp":"2019-10-14T12:10:56","action":"B","userId":"userB"}
{"index":{"_index":"spans"}}
{"timestamp":"2019-10-14T12:51:42","action":"B","userId":"userA"}
{"index":{"_index":"spans"}}
{"timestamp":"2019-10-14T15:23:48","action":"C","userId":"userA"}
{"index":{"_index":"spans"}}
{"timestamp":"2019-10-14T16:05:19","action":"B","userId":"userB"}
{"index":{"_index":"spans"}}
{"timestamp":"2019-10-14T19:44:01","action":"A","userId":"userC"}
{"index":{"_index":"spans"}}
{"timestamp":"2019-10-15T03:18:15","action":"D","userId":"userA"}
PUT _data_frame/transforms/transaction
{
"source": {
"index": [
"spans"
]
},
"dest": {
"index": "transactions"
},
"pivot": {
"group_by": {
"date": {
"terms": {
"field": "userId"
}
}
},
"aggregations": {
"min_timestamp": {
"min": {
"field": "timestamp"
}
},
"max_timestamp": {
"max": {
"field": "timestamp"
}
},
"duration": {
"bucket_script": {
"buckets_path": {
"max": "max_timestamp",
"min": "min_timestamp"
},
"script": "params.max-params.min"
}
},
"trace": {
"scripted_metric": {
"init_script": "state.spans = []",
"map_script": """
Map span = [
'timestamp':doc['timestamp'].value,
'action':doc['action'].value
];
state.spans.add(span)
""",
"combine_script": "return state.spans",
"reduce_script": """
def ret = [];
for (s in states) {
for (span in s) {
ret.add(span);
}
}
ret.sort((HashMap o1, HashMap o2)->o1['timestamp'].toInstant().toEpochMilli().compareTo(o2['timestamp'].toInstant().toEpochMilli()));
return ret;
"""
}
},
"signature": {
"scripted_metric": {
"init_script": "state.spans = []",
"map_script": """
Map span = [
'timestamp':doc['timestamp'].value,
'action':doc['action'].value
];
state.spans.add(span)
""",
"combine_script": "return state.spans",
"reduce_script": """
def ret = [];
for (s in states) {
for (span in s) {
ret.add(span);
}
}
ret.sort((HashMap o1, HashMap o2)->o1['timestamp'].toInstant().toEpochMilli().compareTo(o2['timestamp'].toInstant().toEpochMilli()));
def signature = "";
for (span in ret) {
signature += span['action']
}
return signature;
"""
}
}
}
}
}
POST _data_frame/transforms/transaction/_start
GET transactions/_search
{
"query": {
"wildcard": {
"signature.keyword": "*ABCD*"
}
}
}
The transform, transforms the raw data to an entity centric index around user:
{
"hits" : {
"total" : {
"value" : 3,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "transactions",
"_type" : "_doc",
"_id" : "dVXi9btFe7o12AMiSauzNKgAAAAAAAAA",
"_score" : 1.0,
"_source" : {
"date" : "userA",
"duration" : 6.7452E7,
"trace" : [
{
"action" : "A",
"timestamp" : "2019-10-14T08:34:03.000Z"
},
{
"action" : "A",
"timestamp" : "2019-10-14T11:41:23.000Z"
},
{
"action" : "B",
"timestamp" : "2019-10-14T12:51:42.000Z"
},
{
"action" : "C",
"timestamp" : "2019-10-14T15:23:48.000Z"
},
{
"action" : "D",
"timestamp" : "2019-10-15T03:18:15.000Z"
}
],
"signature" : "AABCD",
"min_timestamp" : "2019-10-14T08:34:03.000Z",
"max_timestamp" : "2019-10-15T03:18:15.000Z"
}
},
...
}
A really useful feature is that this transforms can be continuous (i.e. the transactions
index will be automatically updated when new items are added to spans
.