[Solved] Use grok'd timestamp across multiple messages

Hello,

I am currently building a pipeline to parse iostat data and send it to elasticsearch. I am using grok to find the lines I am interested. Some lines contain the timestamp, the other lines the device information for that timestamp.

#Sample Data
#
#Linux OSWbb v7.3.2
#zzz ***Thu Sep 22 08:00:07 BST 2016
#avg-cpu:  %user   %nice %system %iowait  %steal   %idle
#          11.86    0.00    3.67   14.67    0.00   69.81
#
#Device:         rrqm/s   wrqm/s     r/s     w/s    rkB/s    wkB/s avgrq-sz avgqu-sz   await  svctm  %util
#xvda              0.00     0.00    0.00    0.00     0.00     0.00     0.00     0.00    0.00   0.00   0.00
#xvdb              0.00    11.00    0.00  121.00     0.00   528.00     8.73     0.23    1.90   0.10   1.20
#
#zzz ***Thu Sep 22 08:00:27 BST 2016
#avg-cpu:  %user   %nice %system %iowait  %steal   %idle
#           9.13    0.00    4.03   15.99    0.00   70.85
#
#Device:         rrqm/s   wrqm/s     r/s     w/s    rkB/s    wkB/s avgrq-sz avgqu-sz   await  svctm  %util
#xvda              0.00     0.00    0.00    0.00     0.00     0.00     0.00     0.00    0.00   0.00   0.00
#xvdb              0.00     0.00    0.00    0.00     0.00     0.00     0.00     0.00    0.00   0.00   0.00

input {
	file {
		path => "C:\Workspace\test.dat"
		start_position => beginning
		ignore_older => 0
		sincedb_path => "NUL"
	}
}
filter {
	mutate {
		gsub => ["message", "\r", " "]
	}
	if [message] =~ "^Linux" or [message] =~ "^avg-cpu" or [message] =~ "^ " or [message] == "" or [message] =~ "^Device" {
		drop {}
	} else {
		if [message] =~ "^zzz" {
			grok {
				match => ["message", "%{DATA:field1} +%{DATA:field2} +%{MONTH:month} +%{NUMBER:day} +%{TIME:time} +%{DATA:field4} +%{YEAR:year}"]
			}
			mutate {
				add_field => {
					"timestamp" => "%{day} %{month} %{year} %{time}"
				}
			}
		} else {
			grok {
				match => ["message", "%{DATA:device} +%{NUMBER:read_request_merge_avg:float} +%{NUMBER:write_request_merge_avg:float} +%{NUMBER:read_iops_avg:float} +%{NUMBER:write_iops_avg:float} +%{NUMBER:MB_read_avg:float} +%{NUMBER:MB_write_avg:float} +%{NUMBER:avg_sector_size:float} +%{NUMBER:avg_queue_size:float} +%{NUMBER:io_wait_time_ms:float} +%{NUMBER:io_service_time_ms:float} +%{NUMBER:disk_util_perc:float}"]
				}
		}
	}
}
output {
	stdout {
		codec => rubydebug
	}
}

I can match the lines without any problems but what I really want to do is get the timestamp from the first match and add it as a field to the subsequent line matches in order to get it nicely into elasticsearch, e.g.:

{
	"message" => "xvda              0.00     0.00    0.00    0.00     0.00     0.00     0.00     0.00    0.00   0.00   0.00 ",
	"@version" => "1",
	"@timestamp" => "2016-10-27T23:11:32.507Z",
	"path" => "C:\\Users\\thomas.a.baker\\Downloads\\test.dat",
	"host" => "CPX-I9CC2XTM7B9",
	"device" => "xvda",
	"read_request_merge_avg" => 0.0,
	"write_request_merge_avg" => 0.0,
	"read_iops_avg" => 0.0,
	"write_iops_avg" => 0.0,
	"MB_read_avg" => 0.0,
	"MB_write_avg" => 0.0,
	"avg_sector_size" => 0.0,
	"avg_queue_size" => 0.0,
	"io_wait_time_ms" => 0.0,
	"io_service_time_ms" => 0.0,
	"disk_util_perc" => 0.0
	"timestamp" => "22 Sep 2016 08:00:27"
}

I would then use the date filter to replace @timestamp, but the challenge for me is making that timestamp available to be used again. Is this even possible?

Hope my question makes sense.

Thanks.

treat it as one event, use the multiline codec, to read in the whole iostat event, parse out the date,

you can then use a filter like "split" to create the new events

Thanks Ed - tried that earlier and there were issues. However, I have literally just found the answer thanks to another article: https://discuss.elastic.co/t/keeping-global-variables-in-ls/39908.

Relevant portion of the code updated:

if [message] =~ "^zzz" {
    	grok {
    		match => ["message", "%{DATA:field1} +%{DATA:field2} +%{MONTH:month} +%{NUMBER:day} +%{TIME:time} +%{DATA:field4} +%{YEAR:year}"]
    	}
    	mutate {
    		add_field => {
    			"timestamp" => "%{day} %{month} %{year} %{time}"
    		}
    	}
    	ruby {
    		init => "@@timestamp = ''"
    		code => "@@timestamp = event['timestamp']"
    	}
    	drop {}
    } else {
    	grok {
    		match => ["message", "%{DATA:device} +%{NUMBER:read_request_merge_avg:float} +%{NUMBER:write_request_merge_avg:float} +%{NUMBER:read_iops_avg:float} +%{NUMBER:write_iops_avg:float} +%{NUMBER:kB_read_avg:float} +%{NUMBER:kB_write_avg:float} +%{NUMBER:avg_sector_size:float} +%{NUMBER:avg_queue_size:float} +%{NUMBER:io_wait_time_ms:float} +%{NUMBER:io_service_time_ms:float} +%{NUMBER:disk_util_perc:float}"]
    	}
    	ruby {
    		code => "event['timestamp'] = @@timestamp"
    	}
    }

Simple when you know how!

1 Like

Yah the Ruby filter lets you do alot of things like that, should have thought about that.

warning about it though.

The input thread is different then the Filter threads, So in theory

you could have the wrong data/time stamped on the wrong message.

File -> Input thread ---> Filter thread1
| |- Get date 1, set date1
|_> Filter Thead2
|- Get date 2 set Date 1

Probably low chance of this happening so just wanted to warn you of the possibility as it probably is not thread safe. but then again the ruby filter could be "scoped to just its thread"

Hmmm thanks - something I wouldn't have even thought about. This is not a production pipeline anyway so I'll take the small risk for now :slight_smile:

1 Like