How to calculate and present times between logs

Hey, new user of elk with Logstash.

I am using this as a confid:

input {
    file {
        path => "/myapp/Logs/*.slog"
        start_position => "beginning"
        sincedb_path => "/dev/null"
    }
}

filter {
    json{
        source => "message"
    }
}

output {
    elasticsearch {
        hosts => ["elasticsearch:9200"]
        index => "myapp-json-logs"
    }
}

And the logs that it read have this format:

{"@time":"2022-08-22T17:06:24.8070049+03:00" "@Moving":"< important app 1bla 1bla 1bla "}
<other logs..>
<other logs..>
<other logs..>
{"@time":"2022-08-22T17:16:25.9126576+03:00" "@Moving":"> important app bla2 bla2 bla2 "}
<other logs..>
<other logs..>
{"@time":"2022-08-22T17:23:24.8070049+03:00" "@Moving":"< important app 1bla 1bla 1bla "}
<other logs..>
<other logs..>
<other logs..>
{"@time":"2022-08-22T17:26:25.9126576+03:00" "@Moving":"> important app bla2 bla2 bla2" }

I am trying to find a way to calculate the time that pass between the log that include "@Moving":"< important app to the log that include "@Moving":"> important app"

This value repeats several times in the files.slog

If someone can give me direction I will be very grateful.

You could use an aggregate filter to do that

    if [message] =~ /"@Moving":"/ {
        grok { match => { "message" => '{"@time":"(?<[@metadata][timestamp]>[^"]+)" "@Moving":"(?<[@metadata][msg]>[^"]+)"' } }
        mutate { add_field => { "[@metadata][json]" => '{ "@timestamp": "%{[@metadata][timestamp]}" }' } }
        json { source => "[@metadata][json]" }

        mutate { add_field => { "[@metadata][taskId]" => "anyConstant" } }
        if [@metadata][msg] =~ /^</ {
            aggregate {
                task_id => "%{[@metadata][taskId]}"
                code => 'map["startTime"] = event.get("@timestamp").to_f'
                map_action => "create"
            }
        } else {
            aggregate {
                task_id => "%{[@metadata][taskId]}"
                code => 'event.set("delta", event.get("@timestamp").to_f - map["startTime"])'
                map_action => "update"
                end_of_task => true
            }
        }
    }

which will produce

     "delta" => 181.1056525707245,

for the second pair of lines. Note that using aggregate requires pipeline.workers to be 1 and pipeline.ordered to be true.

Hey @ Badger, thx for the help.
I just applied this setting in the .config, the result was:


(it only discover one log now: Sep 1, 2022 @ 15:25:47.491 _@timestamp:%{[@metadata][timestamp]} @Moving:< important app @time:Aug 22, 2022 @ 16:06:24.807 @timestamp:Sep 1, 2022 @ 15:25:47.491 @version:1 host:b4b8f10395e3 message:{"@time":"2022-08-22T17:06:24.8070049+03:00", "@Moving":"< important app "} path:/XwinSys/Logs/log.slog tags:_grokparsefailure, _timestampparsefailure _id:L446-YIBF2xDdXQmqk8h _index:xwinsys-json-logs _score: - _type:_doc )

I do not get the "delta" field..

My log file:

{"@time":"2022-08-22T17:06:24.8070049+03:00", "@Moving":"< important app "}
{"@time":"2022-08-22T17:16:25.9126576+03:00", "@Moving":"> important app "}

.config:

input {
    file {
        path => "Logs/*.slog"
        start_position => "beginning"
        sincedb_path => "/dev/null"
    }
}

filter {
    json{
        source => "message"
    }

    if [message] =~ /"@Moving":"/ {
        grok { match => { "message" => '{"@time":"(?<[@metadata][timestamp]>[^"]+)" "@Moving":"(?<[@metadata][msg]>[^"]+)"' } }
        mutate { add_field => { "[@metadata][json]" => '{ "@timestamp": "%{[@metadata][timestamp]}" }' } }
        json { source => "[@metadata][json]" }

        mutate { add_field => { "[@metadata][taskId]" => "anyConstant" } }
        if [@metadata][msg] =~ /^</ {
            aggregate {
                task_id => "%{[@metadata][taskId]}"
                code => 'map["startTime"] = event.get("@timestamp").to_f'
                map_action => "create"
            }
        } else {
            aggregate {
                task_id => "%{[@metadata][taskId]}"
                code => 'event.set("delta", event.get("@timestamp").to_f - map["startTime"])'
                map_action => "update"
                end_of_task => true
            }
        }
    }
}

output {
    elasticsearch {
        hosts => ["elasticsearch:9200"]
        index => "json-logs"
    }
}

?

You originally said your log format was

now you are saying it is

You will need to make appropriate changes to the configuration for the different format.

@ Badger many thanks for your help - I think I understand the code now! :))

Pls ignore my other message. and again - thank you ! : ))