Convert iso 8601 timestamp to UNIX_MS

Hello,

I have json logs wich have a field timestamp in iso 8601 format I want to convert it to UNIX_MS so I can calculate the response time between step 1 and 2, 1 and 3 for each mid.

How this can be done ?

This is an example of my log files.

{“log_level”:“INFO”,“timestamp”:“2021-12-22T11:49:06.124890Z”,“event_type”:“step1”,“mid”:“96712abc”}
{“log_level”:“INFO”,“timestamp”:“2021-12-22T11:49:07.124897Z”,“event_type”:“step2”,“mid”:“96712abc”} 
{“log_level”:“INFO”,“timestamp”:“2021-12-22T11:49:07.124899Z”,“event_type”:“step3”,“mid”:“96712abc”} 
{“log_level”:“INFO”,“timestamp”:“2021-12-22T11:49:08.124900Z”,“event_type”:“step1”,“mid”:“875dbca”}

Is that 3 lines or 4?

1 Like

4 lines, sorry It was a typing error

You want something very much like example 1 in the aggregate documentation.

Although the logstash Timestamp object supports nanosecond precision, the date filter does not. However, the json filter does, so we have to rename the timestamp field to @timestamp to get the json filter to use it to set [@timestamp].

    mutate { gsub => [ "message", "“", '"', "message", "”", '"' ] }
    mutate { gsub => [ "message", "timestamp", "@timestamp" ] }
    json { source => "message" remove_field => [ "message" ] }
    if [event_type] == "step1" {
        aggregate {
            task_id => "%{mid}"
            code => 'map["step1Time"] = event.get("@timestamp").to_f'
            map_action => "create"
        }
    }
    if [event_type] == "step2" {
        aggregate {
            task_id => "%{mid}"
            code => 'map["step2Time"] = event.get("@timestamp").to_f'
            map_action => "update"
        }
    }
    if [event_type] == "step3" {
        aggregate {
            task_id => "%{mid}"
            code => '
                map["step3Time"] = event.get("@timestamp").to_f
                event.set("delta12", map["step2Time"] - map["step1Time"])
                event.set("delta13", map["step3Time"] - map["step1Time"])
            '
            map_action => "update"
            end_of_task => true
            timeout => 120
        }
    }

Make sure you understand the requirements for pipeline.workers and pipeline.ordered document for the aggregate filter.

1 Like

Thank you so much Badger, can you please explain this code, I found in the documentation that gsub filter match string field and replace them with the last value . Is the message will be removed ?

And in the aggregation we have
event.get("@timestamp").to_f'
Is the "to_f" convert the timstamp to millisecond ?

The [message] field is modified by the gsub, not removed. The curly quotes are changed to straight quotes, and the string timestamp is replaced with @timestamp.

The .to_f converts the LogStash::Timestamp object to a floating point number. The source JSON has microsecond precision so the floating point number will have microsecond precision. But, as always with floating point numbers, the accuracy of the digital representation is not exact. The difference between 06.124890 and 07.124897 may not be 1.000007, it might be 1.00000699827

1 Like

Thanks sir, I tried this on grok debugger from dev tools but it didn't work, I got this error:

`

Unable to find pattern [mid] in Grok's pattern dictionary

`

Not sure why you would be using a grok debugger. There is no grok filter in the configuration I suggested.

1 Like

Yes yes, you're right there is no need for the grok debugger, but I add the configuration to logstash.conf and nothing is done, no field is added

Hello Badger please can you tell me what is the purpose of using this line:

json { source => "message" remove_field => [ "message" ] }

Your [message] field contains JSON. That will parse it so that you have [log_level], [timestamp], [event_type], and [mid] fields.

If it is successfully parsed then the [message] field will be deleted, so that you do not have duplicate data. If the parsing fails then the [messsage] field will remain so that you can see what is wrong with it.

1 Like

Thanks you Badger for your help.

Do I have to set anything apart from the logstash configuration to run the filter?

Because I copied it as it is and it didn't work, also there is no error message in the logs of the logstash image on docker.

I don't think so.

1 Like

Is there any documentation for pipeline.workers and pipeline.ordered document ?

They are documented here. As with most pipeline setting they can be set globally or per-pipeline.

1 Like