Calculate timestamp duration to next document

Hi, thanks for your answer, I have already tried with the tranform aggs and buckets but I can't get it to calculate something for every single document

POST _transform/_preview
{
  "source": {
    "index": "log-*"
  },
  "dest": {
    "index": "destindex"
  },
  "pivot": {
    "group_by": {
      "name": {
        "terms": {
          "field": "@t"
        }
      }
    },
    "aggregations": {
      "latest_value": {
        "scripted_metric": {
          "init_script": "state.timestamp_latest = 0L;",
          "map_script": """
          def current_date = doc['@t'].getValue().toInstant().toEpochMilli();
          if (current_date > state.timestamp_latest)
          {state.timestamp_latest = current_date;}
        """,
          "combine_script": "return state",
          "reduce_script": """
          def last_doc = '';
          def timestamp_latest = 0L;
          for (s in states) {if (s.timestamp_latest > (timestamp_latest))
          {timestamp_latest = s.timestamp_latest;}}
          return timestamp_latest
        """
        }
      },
      "first_value": {
         "scripted_metric": {
          "init_script": "state.timestamp_first = 999999999999999L;",
          "map_script": """
          def current_date = doc['@t'].getValue().toInstant().toEpochMilli();
          if (current_date < state.timestamp_first)
          {state.timestamp_first = current_date;}
        """,
          "combine_script": "return state",
          "reduce_script": """
          def last_doc = '';
          def timestamp_first = 999999999999999L;
          for (s in states) {if (s.timestamp_first < (timestamp_first))
          {timestamp_first = s.timestamp_first;}}
          return timestamp_first
        """
        }
      },
      "time_length": {
        "bucket_script": {
          "buckets_path": {
            "min": "first_value.value",
            "max": "latest_value.value"
          },
          "script": "(params.max - params.min)/1000"
        }
      }
    }
  }
}
{
      "duration" : 0.0,
      "@t" : "2021-11-08T07:09:14.620Z",
      "@timestamp_min" : "2021-11-08T07:09:14.620Z",
      "@timestamp_max" : "2021-11-08T07:09:14.620Z"
    },
    {
      "duration" : 0.0,
      "@t" : "2021-11-08T07:09:14.668Z",
      "@timestamp_min" : "2021-11-08T07:09:14.668Z",
      "@timestamp_max" : "2021-11-08T07:09:14.668Z"
    },
    {
      "duration" : 0.0,
      "@t" : "2021-11-08T07:09:14.669Z",
      "@timestamp_min" : "2021-11-08T07:09:14.669Z",
      "@timestamp_max" : "2021-11-08T07:09:14.669Z"
    },
    {
      "duration" : 0.0,
      "@t" : "2021-11-08T07:09:14.670Z",
      "@timestamp_min" : "2021-11-08T07:09:14.670Z",
      "@timestamp_max" : "2021-11-08T07:09:14.670Z"
    },
    {
      "duration" : 0.0,
      "@t" : "2021-11-08T07:09:14.782Z",
      "@timestamp_min" : "2021-11-08T07:09:14.782Z",
      "@timestamp_max" : "2021-11-08T07:09:14.782Z"
    },
    {
      "duration" : 0.0,
      "@t" : "2021-11-08T07:09:14.783Z",
      "@timestamp_min" : "2021-11-08T07:09:14.783Z",
      "@timestamp_max" : "2021-11-08T07:09:14.783Z"
    },
    {
      "duration" : 0.0,
      "@t" : "2021-11-08T07:09:17.005Z",
      "@timestamp_min" : "2021-11-08T07:09:17.005Z",
      "@timestamp_max" : "2021-11-08T07:09:17.005Z"
    }

First, what I said is logstash aggregation filter somehow, though the way could be a bit complicated.

Scripted metric aggregation in Elasticsearch works only for bucket per bucket way. It couldn't handle the difference between buckets.

In addition, derivative aggregation targets only histogram / date-histogram aggregations, and could not be used over terms aggregation.

My answer was generally related to aggregation, for logstash aggregation itself i haven't found anything suitable so far and @@map doesn't seem to be the right one either...

If you want to solve this via transform you have to co-locate the docs accordingly. I assume you have many processes and threads and they overlap each other. Therefore I think you should group by ProcessId and ThreadId. This brings the processes together. Using a scripted_metric you can then calculate a duration between every step.

The result however is 1 document per group, this example output would be possible:

 "processId": 46264,
 "ThreadId": 38,
 "events": [
   {
     "@t":"2022-01-05T08:40:41.3920240Z",
     "@mt":"No type was specified for the decimal column '{property}' on entity type '{entityType}'. This will cause values to be silently truncated if they do not fit in the default precision and scale. Explicitly specify the SQL server column type that can accommodate all the values using 'HasColumnType()'.",
     "@l":"Warning","property":"ZFAGSUMME",
     "entityType":"Info",
    ...
    "Authenticated":false,
    "UserLanguage":null,
    "time_taken": 2
   },
   {
     "@t":"2022-01-05T08:40:41.3944267Z",
     "@mt":"No type was specified for the decimal column '{property}' on entity type '{entityType}'. This will cause values to be silently truncated if they do not fit in the default precision and scale. Explicitly specify the SQL server column type that can accommodate all the values using 'HasColumnType()'.",
     "@l":"Warning","property":"ZFVTRPRAEMIE",
     ...
     "time_taken": 23
   },
  ...
]

There is unfortunately no functionality to cut the array into single docs again. Do you need single docs or would a collapsed view work as well?

Do you mean like this?
When I use terms:

"group_by": {
      "ProcessId": {
        "terms": {
          "field": "ProcessId"
        }
      },
      "ThreadId": {
        "terms": {
          "field": "ThreadId"
        }
      }
    },

But then I get only a few results:

 {
      "time_length" : 0.006,
      "ThreadId" : 19,
      "first_value" : 1636355357570,
      "latest_value" : 1636355357576,
      "ProcessId" : 32344
    },
    {
      "time_length" : 2.241,
      "ThreadId" : 29,
      "first_value" : 1636355354782,
      "latest_value" : 1636355357023,
      "ProcessId" : 32344
    },
    {
      "time_length" : 0.024,
      "ThreadId" : 30,
      "first_value" : 1636355359514,
      "latest_value" : 1636355359538,
      "ProcessId" : 32344
    },
    {
      "time_length" : 0.05,
      "ThreadId" : 32,
      "first_value" : 1636355354620,
      "latest_value" : 1636355354670,
      "ProcessId" : 32344
    },
    {
      "time_length" : 2.054,
      "ThreadId" : 33,
      "first_value" : 1636355359769,
      "latest_value" : 1636355361823,
      "ProcessId" : 32344
    },
    {
      "time_length" : 0.003,
      "ThreadId" : 74,
      "first_value" : 1636356845549,
      "latest_value" : 1636356845552,
      "ProcessId" : 32344
    }

I would like to have a calculation for each step and not always to the next thread.

I would like to have it just individually so that I can build such a table:

You can concatenate the individual events using a scripted metric like in this post: Concatenating array objects in elasticsearch transform aggregations

The script should be enhanced to sort the events by time, then you can iterate over the list and add duration fields.

Unfortunately I don't see a possibility to flatten the structure. With "flatten" I mean breaking the group by into individual documents again.

So it is only possible to calculate a duration from the first to the last document?

With a script it is possible to calculate the duration between 2 adjacent events. That means you can't use min and max aggregations, but you have to implement this in painless. I gave an example output in my 1st post. I am certain that this can be done with a script.

so the example result you gave looked good, but i'm not sure what you mean about the implementation. This post somehow does not help me much: https://discuss.elastic.co/t/concatenating-array-objects-in-elasticsearch-transform-aggregations/289980

I have tested only this without success:

Your reducer returns an empty list as you are not concatenating anything:

def docs = []; return docs

In order to return something you need to collect the docs.

Think of it like this:

map is executed on every shard for every document. You add the doc to a list.
combine is executed once per shard, you return the list. You end up with 1 list per shard.
reduce is executed exactly once per search. You get the lists of every shard and you need to combine them into 1 output.

It seems you are a german speaker, therefore I suggest this detailed walk-through with example code and docs: Dec 5th, 2019 [DE] Weihnachtswünsche zusammenfassen mit Transforms

1 Like

yes, the german instructions helped me a lot :grinning:. I now have my expected result in an array:

POST log-sup/_search?size=0
{
  "query": {
    "match_all": {}
  },
  "aggs": {
    "time_taken": {
      "scripted_metric": {
        "init_script": "state.docs = []",
        "map_script": "state.docs.add(new HashMap(params['_source']))",
        "combine_script": "return state.docs",
        "reduce_script": """
        def docs = [];

        for (s in states) {
          for (d in s) {
            // Datum zu EpochMilli konvertieren
            DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS");
            LocalDateTime formatted = LocalDateTime.parse(d['@t'].substring(0, 23), formatter);
            
            // EpochMill zum Array hinzufügen
            docs.add(formatted.toInstant(ZoneOffset.ofTotalSeconds(0)).toEpochMilli());
          }
        }
        
        // Sortieren nach kleinsten Datum
        for (def run = 0; run < docs.length; run++) {
          for (def i2 = 0; i2 < docs.length - 1; i2++) {
            if (docs[i2] > docs[i2 + 1]) {
              // Elemente tauschen
              def index1 = i2; def index2 = i2 + 1;
              def valueIndex1 = docs[index1];
              docs[index1] = docs[index2];
              docs[index2] = valueIndex1;
            }
          }
        }
        
        // Neues Zeitabstand Array
        long[] duration = new long[docs.length];
        
        def lastEpochMilli = 0; def index = 0;

        // Zeitabstand berechnen
        for (def value : docs) {
          duration[index] = index == 0 ? 0 : (value - lastEpochMilli);
          lastEpochMilli = value;
          index++;
        }
        
        return duration;
        """
      }
    }
  }
}

Result:

{
  "took" : 14,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 52,
      "relation" : "eq"
    },
    "max_score" : null,
    "hits" : [ ]
  },
  "aggregations" : {
    "time_taken" : {
      "value" : [
        0,
        48,
        0,
        1,
        1,
        112,
        1,
        0,
        2222,
        4,
        0,
        1,
        0,
        1,
        0,
        1,
        0,
        0,
        11,
        547,
        3,
        0,
        2,
        1,
        1938,
        2,
        0,
        1,
        0,
        1,
        1,
        1,
        1,
        2,
        1,
        2,
        2,
        1,
        2,
        2,
        1,
        2,
        2,
        231,
        2051,
        1,
        1,
        0,
        1,
        1483726,
        3,
        0
      ]
    }
  }
}

How can I add the whole thing to the respective document now?

I suggest to keep the documents as shown in the advent calendar post and adding a time_taken field to every document except for the 1st one (or you add 0 for this one). d in for (d in s) is just a hash map, so you can simple call .put('time_taken', duration) on it.

I'm still a little confused. Do I need to create a Continuous Transform now? If so after what should I use group_by?

i have now implemented it as i understood it, but it makes relatively little sense....

POST _transform/_preview
{
  "source": {
    "index": "log-*"
  },
  "dest": {
    "index": "destindex"
  },
  "pivot": {
    "group_by": {
      "name": {
        "terms": {
          "field": "@t"
        }
      }
    },
    "aggregations": {
      "time_taken": {
        "scripted_metric": {
          "init_script": "state.docs = []",
          "map_script": "state.docs.add(new HashMap(params['_source']))",
          "combine_script": "return state.docs",
          "reduce_script": """
        def docs = [];

        for (s in states) {
          for (d in s) {
            // Datum zu EpochMilli konvertieren
            DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS");
            LocalDateTime formatted = LocalDateTime.parse(d['@t'].substring(0, 23), formatter);
            
            // EpochMili's zum Array hinzufügen
            docs.add(formatted.toInstant(ZoneOffset.ofTotalSeconds(0)).toEpochMilli());
            
            def lastEpochMilli = 0; def index = 0;

            // Zeitabstand berechnen
            for (def value : docs) {
              docs[index] = value - lastEpochMilli;
              lastEpochMilli = value;
              index++;
            }
            
            d.put('time_taken', docs);
          }
        }
        
        return docs;
        """
        }
      }
    }
  }
}

if you instead of creating a new list, put the epochmilli into the doc:

def docs = [];
for (s in states) {
  for (d in s) {
    DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS");
    LocalDateTime formatted = LocalDateTime.parse(d['@t'].substring(0, 23), formatter);
    d.put('epoch_milli', formatted.toInstant(ZoneOffset.ofTotalSeconds(0)).toEpochMilli())
    docs.add(d);
  }
}

Sorting can be simplified (and improved) I think:

def orderCritera = Comparator.comparing(event -> event['epoch_milli']);
docs.sort(orderCritera);

now you can loop over the docs again and add the extra field

def lastEpochMilli = 0
for (d in docs) {
   if (lastEpochMilli > 0) {
       d.put('time_taken', d['epoch_milli'] - lastEpochMilli)
   }
   lastEpochMilli = d['epoch_milli']
}

(If you don't like to keep the epoch_milli field, you can drop it in this loop)

And finally return docs, which returns full documents and not just the array for durations.:

return docs

Note: I haven't tried this, but just created this of my head. There are for sure some bugs, nevertheless it hopefully illustrates the idea.

I think process and thread id, like you already answered yourself:

"group_by": {
      "ProcessId": {
        "terms": {
          "field": "ProcessId"
        }
      },
      "ThreadId": {
        "terms": {
          "field": "ThreadId"
        }
      }
    },

A continuous transform will keep your destination up to date and processes new data as it comes in. Note: Every bucket gets recomputed every time a new event comes in. This is not a problem if you have new process and thread ids and therefore new "sessions", but if you keep adding stuff to existing processes/threads, meaning if they are long running, this might be very expensive.

thank you very much, that's how I imagined it. But how do I visualize it (in a table)?

In Discover this transform looks wierd (all in one):

and under the index I specified in the transform, this field does not exist:
image

My transform (GET _transform/transform):

{
  "count" : 1,
  "transforms" : [
    {
      "id" : "transform",
      "version" : "7.17.0",
      "create_time" : 1645449631123,
      "source" : {
        "index" : [
          "log-*"
        ],
        "query" : {
          "match_all" : { }
        }
      },
      "dest" : {
        "index" : "transform"
      },
      "frequency" : "1s",
      "pivot" : {
        "group_by" : {
          "Port" : {
            "terms" : {
              "field" : "port"
            }
          }
        },
        "aggregations" : {
          "time_taken" : {
            "scripted_metric" : {
              "init_script" : "state.docs = []",
              "map_script" : "state.docs.add(new HashMap(params['_source']))",
              "combine_script" : "return state.docs",
              "reduce_script" : """
       def docs = [];

        for (s in states) {
          for (d in s) {
            // Datum zu EpochMilli konvertieren
            DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS");
            LocalDateTime formatted = LocalDateTime.parse(d['@t'].substring(0, 23), formatter);
            
            // EpochMili's zum Array hinzufügen
            d.put('epoch_milli', formatted.toInstant(ZoneOffset.ofTotalSeconds(0)).toEpochMilli());
            docs.add(d);
          }
        }
        
        // Sortieren nach kleinstem Datum
        def orderCritera = Comparator.comparing(event -> event['epoch_milli']);
        docs.sort(orderCritera);
        
        def lastEpochMilli = 0;
        
        // Zeitabstand berechnen
        for (d in docs) {
          if (lastEpochMilli > 0) {
            d.put('time_taken', d['epoch_milli'] - lastEpochMilli);
          }
          lastEpochMilli = d['epoch_milli'];
        }
        
        return docs;
          """
            }
          }
        }
      },
      "settings" : {
        "max_page_search_size" : 500
      }
    }
  ]
}

The result (GET transform/_search):

{
  "took" : 0,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 1,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "transform",
        "_type" : "_doc",
        "_id" : "AAds-W1CScIYvLqTGUMPSr4AAAAAAAAA",
        "_score" : 1.0,
        "_ignored" : [
          "time_taken.@mt.keyword"
        ],
        "_source" : {
          "Port" : 58576,
          "time_taken" : [
            {
			  ...
              "epoch_milli" : 1636355354620,
              "@t" : "2021-11-08T07:09:14.6203062Z",
              "port" : 58576,
			  ...
            },
            {
			  ...
              "epoch_milli" : 1636355354668,
              "time_taken" : 48,
              "@t" : "2021-11-08T07:09:14.6688205Z",
              "port" : 58576,
			  ...
            },
            {
			  ...
              "epoch_milli" : 1636355354668,
              "time_taken" : 0,
              "@t" : "2021-11-08T07:09:14.6682605Z",
              "port" : 58576,
			  ...
            },
            {
			  ...
              "epoch_milli" : 1636355354669,
              "time_taken" : 1,
              "@t" : "2021-11-08T07:09:14.6691083Z",
              "port" : 58576,
			  ...
            },
            ...
          ]
        }
      }
    ]
  }
}

Looks good from the raw output. I am not sure if discover can turn this into a table. This is what I meant in my 1st post with:

Logstash can do this but unfortunately ingest not.

Ah, that's what you meant :grinning:. But as I understand it, the transform happens only after I have added a file via logstash. How should it be possible to split the transform with logstash afterwards?

You are right, it's not possible. You can let transform feed into an ingest pipeline, but not to logstash.

The split functionality is a missing feature in ingest: [Ingest Pipeline] Ability to split documents · Issue #56769 · elastic/elasticsearch · GitHub