Hey I am trying to implement a query to calculate a difference between two timestamps but filter on a field. The solution Getting duration by using bucket script works well but my use case is the following:
I have, let's call it transactions, coming with the same id but at different timestamps, each transaction with same id comes with different hostname
field, to illustrate the example:
# transaction 1
{
"id": "1234-5678-900",
"@timestamp": "2021-09-23T14:13:50.000Z",
"host.name": "alpha-xxxxx"
}
# transaction 2
{
"id": "1234-5678-100",
"@timestamp": "2021-09-23T14:15:15.000Z",
"host.name": "beta-xxxxx"
}
What i need to do based on the link above, is:
PUT _transform/_preview
{
"source": {
"index": "mysourceindex"
},
"dest": {
"index": "mydestindext"
},
"pivot": {
"group_by": {
"id": {
"terms": {
"field": "id"
}
}
},
"aggregations": {
"time_frame.lte": {
"max": {
"field": "timestamp"
}
# and hostname startswith('beta')
},
"time_frame.gte": {
"min": {
"field": "timestamp"
}
# and hostname startswith('alpha')
},
"time_length": {
"bucket_script": {
"buckets_path": {
"min": "time_frame.gte.value",
"max": "time_frame.lte.value"
},
"script": "params.max - params.min"
}
}
}
}
}
what I tried:
POST _transform/_preview
{
"source": {
"index": "mysourceindex"
},
"dest": {
"index": "mydestindex"
},
"pivot": {
"group_by": {
"id": {
"terms": {
"field": "id"
}
}
},
"aggregations": {
"latest_value": {
"scripted_metric": {
"init_script": "state.timestamp_latest = 0L; state.last_value = ''",
"map_script": """
def current_date = doc['@timestamp'].getValue().toInstant().toEpochMilli();
if (current_date > state.timestamp_latest && doc['host']['name'] =~ /beta/)
{state.timestamp_latest = current_date;}
""",
"combine_script": "return state",
"reduce_script": """
def last_value = '';
def timestamp_latest = 0L;
for (s in states) {if (s.timestamp_latest > (timestamp_latest))
{timestamp_latest = s.timestamp_latest; last_value = s.last_value;}}
return last_value
"""
}
},
"first_value": {
"scripted_metric": {
"init_script": "state.timestamp_first = 99999999999999999999999L; state.first_value = ''",
"map_script": """
def current_date = doc['@timestamp'].getValue().toInstant().toEpochMilli();
if (current_date < state.timestamp_first && doc['host']['name'] =~ /beta/)
{state.timestamp_first = current_date;}
""",
"combine_script": "return state",
"reduce_script": """
def first_value = '';
def timestamp_first = 0L;
for (s in states) {if (s.timestamp_first < (timestamp_first))
{timestamp_first = s.timestamp_first; first_value = s.first_value;}}
return first_value
"""
}
}
,
"time_length": {
"bucket_script": {
"buckets_path": {
"min": "first_value.value",
"max": "last_value.value"
},
"script": "(params.max - params.min)/1000"
}
}
}
}
}
but getting:
{
"error" : {
"root_cause" : [
{
"type" : "action_request_validation_exception",
"reason" : "Validation Failed: 1: No aggregation found for path [last_value.value];"
}
],
"type" : "action_request_validation_exception",
"reason" : "Validation Failed: 1: No aggregation found for path [last_value.value];"
},
"status" : 400
}