I had some time to look at this and think you can do it with an aggregate filter. Note that pipeline.ordered must evaluate to true and pipeline.workers must be set to 1.
Consider the input file
{ "id": 1, "tags": [ "step_1", "pest_1", "foo" ], "@timestamp": "2024-05-04T11:40:38.132Z" }
{ "id": 2, "tags": [ "step_1" ], "@timestamp": "2024-05-04T11:40:38.132Z" }
{ "id": 2, "tags": [ "step_2" ], "@timestamp": "2024-05-04T11:40:38.444Z" }
{ "id": 1, "tags": [ "step_2" ], "@timestamp": "2024-05-04T11:40:38.555Z" }
{ "id": 1, "tags": [ "step_3" ], "@timestamp": "2024-05-04T11:40:42.232Z" }
{ "id": 2, "tags": [ "step_4" ], "@timestamp": "2024-05-04T11:40:39.333Z" }
{ "id": 2, "tags": [ "step_5" ], "@timestamp": "2024-05-04T11:40:44.123Z" }
That can be processed using
aggregate {
task_id => "%{id}"
code => '
step = event.get("tags").select { |tag| tag =~ /^step_\d+/ }
stepNo = step[0].sub(/^step_/, "")
map["steps"] ||= {}
map["steps"][stepNo.to_s] = event.get("@timestamp")
rescue StandardError => e
puts e.to_s
push_map_as_event_on_timeout => true
timeout_task_id_field => "user_id"
timeout => 10
timeout_code => '
steps = event.get("steps")
steps = steps.sort_by { |k, v| k}
lastValue ||= []
steps.each { |k,v|
if lastValue != [] # Skip first item
event.set("step_#{k}_#{lastValue[0]}", v - lastValue[1])
lastValue = [k, v]
rescue StandardError => e
puts e.to_s
to produce (in addition to the source events) two events like this
"@timestamp" => 2024-05-04T16:36:49.678604952Z,
"steps" => {
"2" => 2024-05-04T11:40:38.555Z,
"3" => 2024-05-04T11:40:42.232Z,
"1" => 2024-05-04T11:40:38.132Z
"@version" => "1",
"step_3_2" => 3.677,
"user_id" => "1",
"step_2_1" => 0.423
"@timestamp" => 2024-05-04T16:36:49.679253516Z,
"steps" => {
"2" => 2024-05-04T11:40:38.444Z,
"1" => 2024-05-04T11:40:38.132Z,
"4" => 2024-05-04T11:40:39.333Z,
"5" => 2024-05-04T11:40:44.123Z
"@version" => "1",
"step_4_2" => 0.889,
"step_5_4" => 4.79,
"user_id" => "2",
"step_2_1" => 0.312
Obviously this can be adjusted in many, many ways.