Multiline and grok filter to merge multi line and then filter

Hi experts

I am very new to ELK stack and still learning. I have some logs streamed from a switch reporting congestion. Below is the log format. Record shows start, update and stop type records, with timestamp, interface name , queueSize and txlatency as important parameters to be filtered.

CongestionRecord:
timestamp: Thu Jun 29 10:35:12.9282 2017
intfName: Ethernet2
switchId: 0
portId: 20
queueSize: 519
entryType: Start
trafficClass: 1
txLatency: 1137.528
CongestionRecord:
timestamp: Thu Jun 29 10:35:12.9292 2017
intfName: Ethernet2
switchId: 0
portId: 20
queueSize: 526
entryType: Update
trafficClass: 1
txLatency: 1155.096
CongestionRecord:
timestamp: Thu Jun 29 10:35:12.11725 2017
intfName: Ethernet2
switchId: 0
portId: 20
queueSize: 1976
entryType: End
trafficClass: 1
timeOfMaxQLen: 10548
txLatency: 4339.296

I used multiline filter put one record as a message.

filter {
multiline {
pattern => "^CongestionRecord:"
what => "previous"
negate => true
}

An extract from output is given below.

"_index": "congestion",
"_type": "log",
"_id": "AVzyKL4KfBsK2dyTIqlw",
"_score": null,
"_source": {
"message": "CongestionRecord:\n timestamp: Thu Jun 29 14:25:43.14614 2017\n intfName: Ethernet3\n switchId: 0\n portId: 41\n queueSize: 645\n entryType: Update\n trafficClass: 1\n txLatency: 1324.064",
"@version": "1",
"@timestamp": "2017-06-29T04:25:43.156Z",
"source": "/var/log/arista-streaming.log",

How can I extract all fields from above message as seperate fields ?

Appreciate quick response.

Have you tried using a kv filter? The newline character might stir things up but apart from that it should be pretty straight forward.

Thanks Magnus.

I was able to seperate fields using kv filter as you suggested.

"message": "CongestionRecord:\n   timestamp: Fri Jun 30 16:02:24.14190 2017\n   intfName: Ethernet3\n   switchId: 0\n   portId: 41\n   queueSize: 268\n   entryType: Update\n   trafficClass: 1\n   txLatency: 855.456",
"@version": "1",
"@timestamp": "2017-07-02T01:49:51.286Z",
"path": "/var/log/arista-newtest.log",
"host": "ansible-rh73",
"tags": [
  "multiline"
],
"   timestamp": "Fri Jun 30 16:02:24.14190 2017",
"   intfName": "Ethernet3",
"   switchId": "0",
"   portId": "41",
"   queueSize": "268",
"   entryType": "Update",
"   trafficClass": "1",
"   txLatency": "855.456"

Filter I used as:

filter {
multiline {
pattern => "^CongestionRecord:"
what => "previous"
negate => true
}

kv {
field_split => "\n"
value_split => ":"
source => "message"
}

mutate {
convert => { "queueSize" => "integer" }
convert => { "txLatency" => "integer" }
convert => { "trafficClass" => "integer" }
convert => { "switchId" => "integer" }
convert => { "portId" => "integer" }

But then I see another issue that even though I am converting fiels into integer, they are still showed as String in Kibana. I have to plot graphs for queuesize and txlatency of port against time and so I need to convert those into integers. Any ideas how to do that ?

The mappings of existing fields can't be changed without reindexing. If you use daily indexes the problem should be fixed as the next day's index is created.

Hi

I changed the index to dated. I also tried to create a new index but problem persisted.

It seems that string output is coming from logstash. Values are coming out as strings instead of integers.

{
"@timestamp" => "2017-07-04T00:58:23.747Z",
"path" => "/var/log/arista-new.log",
"host" => "ansible-rh73",
"tags" => [],
" timestamp" => "Tue Jul 4 10:03:53.85655 2017",
" intfName" => "Ethernet2",
" switchId" => "0",
" portId" => "20",
" queueSize" => "675",
" entryType" => "Update",
" trafficClass" => "1",
" txLatency" => "2678.4"
}

Here is the filter again.

input {
file {
path => "/var/log/arista-new.log"
start_position => "beginning"
}
}

filter {
multiline {
pattern => "^CongestionRecord:"
what => "previous"
negate => true
}

kv {
field_split => "\n"
value_split => ":"
source => "message"
}

mutate {
convert => { "queueSize" => "integer" }
convert => { "txLatency" => "float" }
convert => { "trafficClass" => "integer" }
convert => { "switchId" => "integer" }
convert => { "portId" => "integer" }
}
mutate {
remove_field => [ "message" ]
}

mutate {
remove_tag => [ "multiline" ]
}

mutate {
remove_field => [ "@version" ]
}
}

output {
stdout { codec => rubydebug }
}

I changed the filter to rubydebug to test the output from logstash. My actual configuration is

output {
elasticsearch {
hosts => ["localhost:9200"]
index => "arista-%{+YYYY.MM.dd}"
}

{
"@timestamp" => "2017-07-04T00:58:23.747Z",
"path" => "/var/log/arista-new.log",
"host" => "ansible-rh73",
"tags" => ,
" timestamp" => "Tue Jul 4 10:03:53.85655 2017",
" intfName" => "Ethernet2",
" switchId" => "0",
" portId" => "20",
" queueSize" => "675",
" entryType" => "Update",
" trafficClass" => "1",
" txLatency" => "2678.4"
}

You have a leading space in your field names. Look into the kv filter's trim_key option.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.