Calculate time difference between two log lines in a file

Hello there,
I have a log file (given by the SLURM workload manager) that looks something like this:

2019-07-15 13:26:40 Started - job id: V3gNDm1MD7unkvVVEmSjiJLoABFKDmABFKDmtdGKDmABFKDmN8Erlm, unix user: 45001:45000, name: "org.nordugrid.ARC-CE-result-ops", owner: "/DC=EU/DC=EGI/C=GR/O=Robots/O=Greek Research and Technology Network/CN=Robot:argo-egi@grnet.gr", lrms: SLURM, queue: parallel1
2019-07-15 13:31:10 Finished - job id: V3gNDm1MD7unkvVVEmSjiJLoABFKDmABFKDmtdGKDmABFKDmN8Erlm, unix user: 45001:45000, name: "org.nordugrid.ARC-CE-result-ops", owner: "/DC=EU/DC=EGI/C=GR/O=Robots/O=Greek Research and Technology Network/CN=Robot:argo-egi@grnet.gr", lrms: SLURM, queue: parallel1, lrmsid: 66918

Basically pairs of lines with "Started" - "Finished" status which correspond to a unique job_id.
I want to parse the data into a JSON format and also I want to add a new field for each "Finished" line with the job duration (which is given by subtracting the time in Finished line and the time from Started line.
I've started to use logstash filtering last week, so I am quite new to this. From my research, I found that ruby plug-in could help me, but so far I didn't manage to get a non-zero value for the job duration calculus.
This is my code:

filter {  #filtering process for SLURM log file       
        grok {  #DEFINED PATTERN FOR TXT FILE              
                match => { "message" => ["%{TIMESTAMP_ISO8601:[JOB][job_Details][Date1]} %{WORD:[JOB][job_status]} [-] %{DATA}[:] %{DATA:[JOB][job_id]}[,] %{DATA:[JOB][job_Details][user_Type]}[:] %{DATA:[JOB][job_Details][user_ID]}[,] %{DATA}[:] \"%{DATA:[JOB][job_Details][user_Name]}\"[,] %{DATA}: \"%{DATA:[JOB][job_Details][owner]}\"[,] %{DATA}[:] %{DATA:[JOB][job_Details][Workload_Manager_Type]}[,] %{DATA}[:] %{DATA:[JOB][job_Details][queue_Type]}[,] %{DATA}[:] %{NUMBER:[JOB][job_Details][Workload_Manager_id]}","%{TIMESTAMP_ISO8601:[JOB][job_Details][Date2]} %{WORD:[JOB][job_status]} [-] %{DATA}[:] %{DATA:[JOB][job_id]}[,] %{DATA:[JOB][job_Details][user_Type]}[:] %{DATA:[JOB][job_Details][user_ID]}[,] %{DATA}[:] \"%{DATA:[JOB][job_Details][user_Name]}\"[,] %{DATA}: \"%{DATA:[JOB][job_Details][owner]}\"[,] %{DATA}[:] %{DATA:[JOB][job_Details][Workload_Manager_Type]}[,] %{DATA}[:] %{GREEDYDATA:[JOB][job_Details][queue_Type]}" ]
                }
                remove_field => ["message","host","@version","path"]
        }
        date {
                match => ["[JOB][job_Details][Date1]", "yyyy-MM-dd HH:mm:ss"]
                target => "[JOB][job_Details][Finish_Time]"
            }
        date {      
                match => ["[JOB][job_Details][Date2]", "yyyy-MM-dd HH:mm:ss"]
                target => "[JOB][job_Details][Start_Time]"
            }
ruby {
        init => "require 'time'"
        code => "duration=(Time.parse(event.get('[JOB][job_Details][Date1]'))-Time.parse(event.get('[JOB][job_Details][Date2]'))) rescue nil; event.set('[JOB][job_Details][PROCESS_DURATION]',duration);"
}

And my output:

{
    "@timestamp": "2019-10-15T09:07:29.057Z",
    "JOB": {
        "job_id": "V3gNDm1MD7unkvVVEmSjiJLoABFKDmABFKDmtdGKDmABFKDmN8Erlm",
        "job_Details": {
            "user_ID": "45001:45000",
            "owner": "/DC=EU/DC=EGI/C=GR/O=Robots/O=Greek Research and Technology Network/CN=Robot:argo-egi@grnet.gr",
            "user_Type": "unix user",
            "PROCESS_DURATION": null,
            "queue_Type": "parallel1",
            "Workload_Manager_Type": "SLURM",
            "Start_Time": "2019-07-15T10:26:40.000Z",
            "user_Name": "org.nordugrid.ARC-CE-result-ops"
        },
        "job_status": "Started"
    }
} {
    "@timestamp": "2019-10-15T09:07:29.057Z",
    "JOB": {
        "job_id": "V3gNDm1MD7unkvVVEmSjiJLoABFKDmABFKDmtdGKDmABFKDmN8Erlm",
        "job_Details": {
            "user_ID": "45001:45000",
            "owner": "/DC=EU/DC=EGI/C=GR/O=Robots/O=Greek Research and Technology Network/CN=Robot:argo-egi@grnet.gr",
            "Workload_Manager_id": "66918",
            "user_Type": "unix user",
            "PROCESS_DURATION": null,
            "queue_Type": "parallel1",
            "Workload_Manager_Type": "SLURM",
            "Finish_Time": "2019-07-15T10:31:10.000Z",
            "user_Name": "org.nordugrid.ARC-CE-result-ops"
        },
        "job_status": "Finished"
    }
}

As you can see, I can add the "PROCESS_DURATION field to the lines (although I would like to add it only to the "Finished" line) but the result is null.
What am I doing wrong here?

Thank you in advance:)

I cannot see what you did wrong there, but I would do it in a different way, using aggregate.

    dissect { mapping => { "message" => "%{[@metadata][timestamp]} %{+[@metadata][timestamp]} %{event} - %{[@metadata][restOfLine]}" } }
    date { match => [ "[@metadata][timestamp]", "YYYY-MM-dd HH:mm:ss" ] }
    kv { source => "[@metadata][restOfLine]" target => "[@metadata][keys]" field_split => "," value_split => ":" remove_char_key => " " }
    aggregate {
        task_id => "%{[@metadata][keys][jobid]}"
        code => '
            keys = event.get("[@metadata][keys]")
            if keys
                keys.each { |k, v|
                    map[k] = v
                }
            end

            e = event.get("event")
            map["time#{e}"] = event.get("@timestamp")

            event.cancel
        '
        push_map_as_event_on_timeout => true
        timeout => 3
        timeout_code => '
            event.set("duration", event.get("timeFinished").to_f - event.get("timeStarted").to_f)
        '
    }

will produce

    "@version" => "1",
       "owner" => "/DC=EU/DC=EGI/C=GR/O=Robots/O=Greek Research and Technology Network/CN=Robot:argo-egi@grnet.gr",
      "lrmsid" => "66918",
       "queue" => "parallel1",
 "timeStarted" => 2019-07-15T13:26:40.000Z,
  "@timestamp" => 2019-10-15T15:22:13.905Z,
        "name" => "org.nordugrid.ARC-CE-result-ops",
"timeFinished" => 2019-07-15T13:31:10.000Z,
    "unixuser" => "45001:45000",
        "lrms" => "SLURM",
       "jobid" => "V3gNDm1MD7unkvVVEmSjiJLoABFKDmABFKDmtdGKDmABFKDmN8Erlm",
    "duration" => 270.0

Obviously you can move the keys under [JOB][job_Details] using string interpolation in the keys.each loop as I do in storing the timestamps.

1 Like

Ok @Badger, first of all thank you very much for the solution! It works perfectly for my entire log. The great thing is that now with aggregate I have everything under the same job_id (in my case I had two objects: one for "started" and one for "finished")

Regarding the keys, I changed the fields using mutate filter e.g.:

 rename => { "name" => "[job_Details][user_Name]"}

I know this is the naive implementation, but I didn't know how to change it in the Aggregate filter :smiley:

I also have to say that your solution is quite elegant, the way you created all the fields from the "restOfLine" key. However, I do have a couple of questions regarding your method. Namely these two parts:

  keys.each { |k, v|
                    map[k] = v
                }

and

 push_map_as_event_on_timeout => true
        timeout => 3
        timeout_code => '

Could you please elaborate more on what is this actually doing, and why did you go this way? Thank you very much <3

When using push_map_as_event_on_timeout the only fields on the resulting event are whatever is added to the map, so I configured the kv filter to put all of the extra fields into a known place ([@metadata][keys]), then in the aggregate filter I copy (or overwrite) each of the keys into the map.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.