How do I match a newline in grok/logstash

I am using Logstash to parse and filter the data. The input data looks something like:

> Tue  Apr 05 01:33:13 EDT 2016
  r/s w/s  cache free_mem used_mem swap_mem page faults id wa
    0  0      0 7535996  72612 232184    0    1     19   35   100 
    0  0      0 7535988  72612 232188    0    0     283  532   100 
    0  0      0 7535988  72620 232188    0    0     279  533  100 
    0  0      0 7535988  72620 232188    0    0     275  530  100
    0  0      0 7536020  72628 232188    0    0     284  535  100    

Here is the filter i am applying in logstash config file:
filter { grok { match => { "message" => "%{NUMBER:r:int} +%{NUMBER:w:int} +%{NUMBER:cache:int} +%{NUMBER:free_mem:int} +%{NUMBER:used_mem:int} +%{NUMBER:swap_mem:int} +%{NUMBER:page:int} +%{NUMBER:faults:int} +%{NUMBER:id:int} +%{NUMBER:wa:int} "}

Apart from timestamp, every field is matching. If i add %{DATESTAMP_OTHER:Time} to the filter, which will match the time, in the output it is giving ""_grokparsefailure" for the first line and the remaining data is filtering properly.

How can i match a newline in grok filter to parse the whole event as a filtered data.

If you are want each line individually, you can have multiple match grok patterns.

match => {
  "message" => [
    '%{NUMBER:pattern1}', 
    '%{DATA:pattern2}'
  ]
}

If not, and you are looking to group all the lines together - you need to look at the multi line codec. Once the multi line has been processed into a single event, you would need a grok to match that (it would have \n for each new line).

1 Like

Jared, thank you for your response. I have used multi line codec, it is processing the events which are not starting with time stamp as single event. But, in my grok filter i have only patterns matching for 11 fields. It is only considering the first 11 values and showing them in the output. How can i get all the values from the above input we have 5 rows which are having same header/column names?

Config file:

input {
 stdin {
    codec => multiline {
      pattern => "%{DATESTAMP_OTHER:Time}"
      negate => true
      what => "previous"
    }
  }
}
 filter {
  mutate {
        gsub => ["message", "\n", ""]
 }

  grok {
   match =>  ["message", "(?m)%{NUMBER:r:int} +%{NUMBER:w:int} +%{NUMBER:cache:int} +%{NUMBER:free_mem:int} +%{NUMBER:used_mem:int} +%{NUMBER:swap_mem:int} +%{NUMBER:page:int} +%{NUMBER:faults:int} +%{NUMBER:id:int} +%{NUMBER:wa:int} "]}

output {
  stdout { codec => rubydebug }
}

stdout output:

r: 0
w: 0
cache: 0
free_mem: 7535996
used_mem: 72612
swap_mem: 232184
page: 0
faults:1
id:19
wa:100

I personally would not discard the \n character, it makes it impossible to get the message back to the original format. When the grok is setup correctly, the \n character will only be in the original message field.

Could you post the message field from stdout (or the entire stdout)?

I have removed mutate filter from the config file and executed it. The below is the stdout after executing the logstash config file:

"@timestamp" => "2016-07-26T16:56:22.517Z",
       "message" => "Tue Apr 05 01:33:13 EDT 2016\n r/s w/s  cache free_mem used_mem swap_mem page faults id wa\n 0  0      0 7535996  72612 232184       1     19   35   100\n 0  0      0 7535988  72612 232188       0     283  532  100\n 0  0      0 7535988  72620 232188       0     279  533  100\n 0  0      0 7535988  72620 232188       0     275  530  100\n 0  0      0 7536020  72628 232188       0     284  535  100",
      "@version" => "1",
          "tags" => [
        [0] "multiline"
    ],
          "host" => "txcdtl01ag658v",
             "r" => 0,
             "w" => 0,
         "cache" => 0,
      "free_mem" => 7535996,
      "used_mem" => 72612,
      "swap_mem" => 232184,
          "page" => 1,
        "faults" => 19,
            "id" => 35,
            "wa" => 100
}

Here, as you can see in the output i was able to get the first line values only. I need all the values which are combined in to one single event.

Thank you.

Looking at this, this seems to be the output of vmstat. Each line represents a new measurement (ex. every 2 seconds). I would expect that you would want each line individually, taking the date from the start / header message and caculating the time + interval count. Ex.

 {
  "host" => "txcdtl01ag658v",
  "timestamp": "Tue Apr 05 01:33:13 EDT 2016",
   "r" => 0,
   "w" => 0,
  "free_mem" => 7535996,
  ...
}
{
  "host" => "txcdtl01ag658v",
  "timestamp": "Tue Apr 05 01:33:15 EDT 2016",
   "r" => 0,
   "w" => 0,
  "free_mem" => 7535998,
  ...
}

Or, are you looking to have something like this

"host" => "txcdtl01ag658v",
"timestamp": "Tue Apr 05 01:33:13 EDT 2016",
"r" => [ 0,0,0,0 ],
"w" => [ 0,0,0,0 ],
"free_mem" => [ 7535996, 7535998, 7535998, 7535996, 7535995]
...

Jared, i need each line individually and it should have time stamp from the starting of the event. From the 2 outputs which you have posted, the first output should be better.

Do you have control over the log, and old data is not necessary? You can run vmstat with the -t option. It will output a timestamp per line, which can very easily be parsed.

$ vmstat -t -n 1 10
procs -----------memory---------- ---swap-- -----io---- -system-- ------cpu----- -----timestamp-----
 r  b   swpd   free   buff  cache   si   so    bi    bo   in   cs us sy id wa st                 EDT
 2  0   5172 174696  32256 231448    0    0    42   112   29   35  0  0 99  0  0 2016-03-18 22:40:12
 0  0   5172 174684  32256 231448    0    0     0     0   12   16  0  0 100  0  0 2016-03-18 22:40:13
 0  0   5172 174684  32256 231448    0    0     0     0    7    8  0  0 100  0  0 2016-03-18 22:40:14
 0  0   5172 174684  32256 231448    0    0     0     0    9   10  0  0 100  0  0 2016-03-18 22:40:15
 0  0   5172 174684  32256 231448    0    0     0     0    8   10  0  0 100  0  0 2016-03-18 22:40:16
 0  0   5172 174684  32256 231448    0    0     0     0    9   10  0  0 100  0  0 2016-03-18 22:40:17
 0  0   5172 174684  32256 231448    0    0     0     0    7    8  0  0 100  0  0 2016-03-18 22:40:18
 0  0   5172 174684  32256 231448    0    0     0     0    8   10  0  0 100  0  0 2016-03-18 22:40:19
 0  0   5172 174684  32256 231448    0    0     0     0    8   10  0  0 100  0  0 2016-03-18 22:40:20
 0  0   5172 174684  32256 231448    0    0     0     0   10   10  0  0 100  0  0 2016-03-18 22:40:21

Then you can simply do something like this.

filter {
	if [type] == "vmstat" {
		if [message] =~ "procs --" or [message] =~ "r  b   swpd"{
			drop {}
		}
		csv {
			separator => " "
			columns => ["[vmstat][r]", "[vmstat][b]", "[vmstat][swpd]", "[vmstat][free]", "[vmstat][buff]",
			"[vmstat][cache]", "[vmstat][si]", "[vmstat][so]", "[vmstat][bi]", "[vmstat][bo]", "[vmstat][in]",
			"[vmstat][cs]", "[vmstat][us]", "[vmstat][sy]", "[vmstat][id]", "[vmstat][wa]", "[vmstat][st]", "date", "time"]
		}
		mutate {
			convert => [
				"[vmstat][r]", "integer",
				"[vmstat][b]", "integer",
				"[vmstat][swpd]", "integer",
				"[vmstat][free]", "integer",
				"[vmstat][buff]", "integer",
				"[vmstat][cache]", "integer",
				"[vmstat][si]", "integer",
				"[vmstat][so]", "integer",
				"[vmstat][bi]", "integer",
				"[vmstat][bo]", "integer",
				"[vmstat][in]", "integer",
				"[vmstat][cs]", "integer",
				"[vmstat][us]", "integer",
				"[vmstat][sy]", "integer",
				"[vmstat][id]", "integer",
				"[vmstat][wa]", "integer",
				"[vmstat][st]", "integer"
			]
      add_field => { "timestamp" => "%{date} %{time}" }
		}
		date {
			match => ["timestamp", "YYYY-MM-dd HH:mm:ss"]
      # remove time related fields once @timestamp has been set
      remove_field => [ "date", "time", "timestamp" ]
		}
	}
}

Keep in mind, the timezone was not set. By default, logstash will use the timezone / offset of the system that it is running on.

I don't have control over the log data. The data which i posted earlier is the data format. Does the above configuration works for my data?

Omitting the date / time parts of the config, yes.

It will be rather tricky to get the first timestamp from log line 1, and use it for each subsequent message to add the vmstat interval (which is not provided). Logstash does not have a line number for the file input, and there is no guarantee that the lines would be processed in a specific order (due to the worker model for filter / output). The multiline codec might help in this situation, if the messages are rather short (how many lines get grouped to each multiline message?). It might be possible to use the ruby code filter to iterate through the message looking for the \n and applying a calculated timestamp. Overall, this will be rather complex - and can be problematic / incorrect if not very controlled (assumptions on the vmstat interval, assumption as to how many lines will be rolled into a single message, etc).

Jared, in your config file you are using csv filter whereas in my config file i am trying to parse the data using grok filter. What is the right filter to apply for the vmstat data. Also, i have tried your config mentioned above, it doesn't worked for me.

Either way is fine. The csv method at face value is simple, since the output of vmstat is "structured and known". Grok would use regular expressions. Really without looking at the underlying code, it is rather hard to say what method would be better here.

My example assumes that you did not use the multiline codec to group the messages. You never answered on the questions I had about that. The example looked specifically for type == vmstat, then removed the two header columns, and parsed using the csv by splitting on spaces. I had also enabled the -t option when I invoked vmstat (shown in the example), so the last couple lines where to properly parse the timestamp from vmstat and replace the logstash @timestamp appropriately.

I am not using multiline while using csv filter. While using grok filter it has multiline codec, these two are different configuration files i am using to solve this problem. In my input data it has two headers (timestamp and columns), how to remove them while parsing with CSV. But, i need to capture the time stamp to know at what time the event has occured. The interval of data is 10 minutes, for every 10 minutes the data will be written into a vmstat log file.

Also, i am expecting the output something like this:

  {
  "host" => "hostname",
  "timestamp": "Tue Apr 05 01:33:13 EDT 2016",
   "r" => 0,
   "w" => 0,
  "free_mem" => 7535996,
  ...
}
{
  "host" => "hostname",
  "timestamp": "Tue Apr 05 01:33:15 EDT 2016",
   "r" => 0,
   "w" => 0,
  "free_mem" => 7535998,
  ...
}