Calculate time difference between two timestamps and filter on a field

Hey I am trying to implement a query to calculate a difference between two timestamps but filter on a field. The solution Getting duration by using bucket script works well but my use case is the following:
I have, let's call it transactions, coming with the same id but at different timestamps, each transaction with same id comes with different hostname field, to illustrate the example:

# transaction 1
{
  "id": "1234-5678-900",
  "@timestamp": "2021-09-23T14:13:50.000Z",
  "host.name": "alpha-xxxxx"
}
# transaction 2
{
  "id": "1234-5678-100",
  "@timestamp": "2021-09-23T14:15:15.000Z",
  "host.name": "beta-xxxxx"
}

What i need to do based on the link above, is:

PUT _transform/_preview
{
  "source": {
    "index": "mysourceindex"
  },
  "dest": {
    "index": "mydestindext"
  },
  "pivot": {
    "group_by": {
     "id": {
        "terms": {
          "field": "id"
        }
      }
    },
    "aggregations": {
      "time_frame.lte": {
        "max": {
          "field": "timestamp"
        }
        # and hostname startswith('beta')
      },
      "time_frame.gte": {
        "min": {
          "field": "timestamp"
        }
    # and hostname startswith('alpha')
      },
      "time_length": { 
        "bucket_script": {
          "buckets_path": { 
            "min": "time_frame.gte.value",
            "max": "time_frame.lte.value"
          },
          "script": "params.max - params.min" 
        }
      }
    }
  }
}

what I tried:

POST _transform/_preview
{
  "source": {
    "index": "mysourceindex"
  },
  "dest": {
    "index": "mydestindex"
  },
  "pivot": {
    "group_by": {
      "id": {
        "terms": {
          "field": "id"
        }
      }
    },
    "aggregations": {
      "latest_value": {
        "scripted_metric": {
          "init_script": "state.timestamp_latest = 0L; state.last_value = ''",
          "map_script": """
            def current_date = doc['@timestamp'].getValue().toInstant().toEpochMilli();
            if (current_date > state.timestamp_latest && doc['host']['name'] =~ /beta/)
            {state.timestamp_latest = current_date;}
          """,
      "combine_script": "return state",
      "reduce_script": """
        def last_value = '';
        def timestamp_latest = 0L;
        for (s in states) {if (s.timestamp_latest > (timestamp_latest))
        {timestamp_latest = s.timestamp_latest; last_value = s.last_value;}}
        return last_value
      """
    }
  },
      "first_value": {
        "scripted_metric": {
          "init_script": "state.timestamp_first = 99999999999999999999999L; state.first_value = ''",
          "map_script": """
            def current_date = doc['@timestamp'].getValue().toInstant().toEpochMilli();
            if (current_date < state.timestamp_first && doc['host']['name'] =~ /beta/)
            {state.timestamp_first = current_date;}
          """,
      "combine_script": "return state",
      "reduce_script": """
        def first_value = '';
        def timestamp_first = 0L;
        for (s in states) {if (s.timestamp_first < (timestamp_first))
        {timestamp_first = s.timestamp_first; first_value = s.first_value;}}
        return first_value
      """
    }
  }
      ,
      "time_length": { 
        "bucket_script": {
          "buckets_path": { 
            "min": "first_value.value",
            "max": "last_value.value"
          },
          "script": "(params.max - params.min)/1000" 
        }
      }
    }
  }
}

but getting:

{
  "error" : {
    "root_cause" : [
      {
        "type" : "action_request_validation_exception",
        "reason" : "Validation Failed: 1: No aggregation found for path [last_value.value];"
      }
    ],
    "type" : "action_request_validation_exception",
    "reason" : "Validation Failed: 1: No aggregation found for path [last_value.value];"
  },
  "status" : 400
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.