I had some time to look at this and think you can do it with an aggregate filter. Note that pipeline.ordered must evaluate to true and pipeline.workers must be set to 1.
Consider the input file
{ "id": 1, "tags": [ "step_1", "pest_1", "foo" ], "@timestamp": "2024-05-04T11:40:38.132Z" }
{ "id": 2, "tags": [ "step_1" ], "@timestamp": "2024-05-04T11:40:38.132Z" }
{ "id": 2, "tags": [ "step_2" ], "@timestamp": "2024-05-04T11:40:38.444Z" }
{ "id": 1, "tags": [ "step_2" ], "@timestamp": "2024-05-04T11:40:38.555Z" }
{ "id": 1, "tags": [ "step_3" ], "@timestamp": "2024-05-04T11:40:42.232Z" }
{ "id": 2, "tags": [ "step_4" ], "@timestamp": "2024-05-04T11:40:39.333Z" }
{ "id": 2, "tags": [ "step_5" ], "@timestamp": "2024-05-04T11:40:44.123Z" }
That can be processed using
aggregate {
task_id => "%{id}"
code => '
begin
step = event.get("tags").select { |tag| tag =~ /^step_\d+/ }
stepNo = step[0].sub(/^step_/, "")
map["steps"] ||= {}
map["steps"][stepNo.to_s] = event.get("@timestamp")
rescue StandardError => e
puts e.to_s
end
'
push_map_as_event_on_timeout => true
timeout_task_id_field => "user_id"
timeout => 10
timeout_code => '
begin
steps = event.get("steps")
steps = steps.sort_by { |k, v| k}
lastValue ||= []
steps.each { |k,v|
if lastValue != [] # Skip first item
event.set("step_#{k}_#{lastValue[0]}", v - lastValue[1])
end
lastValue = [k, v]
}
rescue StandardError => e
puts e.to_s
end
'
}
to produce (in addition to the source events) two events like this
{
"@timestamp" => 2024-05-04T16:36:49.678604952Z,
"steps" => {
"2" => 2024-05-04T11:40:38.555Z,
"3" => 2024-05-04T11:40:42.232Z,
"1" => 2024-05-04T11:40:38.132Z
},
"@version" => "1",
"step_3_2" => 3.677,
"user_id" => "1",
"step_2_1" => 0.423
}
{
"@timestamp" => 2024-05-04T16:36:49.679253516Z,
"steps" => {
"2" => 2024-05-04T11:40:38.444Z,
"1" => 2024-05-04T11:40:38.132Z,
"4" => 2024-05-04T11:40:39.333Z,
"5" => 2024-05-04T11:40:44.123Z
},
"@version" => "1",
"step_4_2" => 0.889,
"step_5_4" => 4.79,
"user_id" => "2",
"step_2_1" => 0.312
}
Obviously this can be adjusted in many, many ways.