Problem with parsing multiline filter plugin

Hi
i have to parse netstat ("Network Statistics") output using logstash

sample entries will be like bellow

TIME: 16:14:30.14

Active Connections

Proto Local Address Foreign Address State
TCP 0.0.0.0:66 0.0.0.0:0 LISTENING
TCP 0.0.0.0:445 0.0.0.0:0 LISTENING
TCP 0.0.0.0:5357 0.0.0.0:0 LISTENING
TCP 0.0.0.0:9937 0.0.0.0:0 LISTENING
TCP 0.0.0.0:12372 0.0.0.0:0 LISTENING
TCP 0.0.0.0:49664 0.0.0.0:0 LISTENING
TCP 0.0.0.0:49665 0.0.0.0:0 LISTENING
TCP 0.0.0.0:49666 0.0.0.0:0 LISTENING
TCP 0.0.0.0:49667 0.0.0.0:0 LISTENING
TCP 0.0.0.0:49668 0.0.0.0:0 LISTENING
TCP 0.0.0.0:49679 0.0.0.0:0 LISTENING

here i have to use TIME: 16:14:30.14 for all lines and push it into db, so i used 'multiline' as bellow

filter
{
multiline {
pattern => "^%{WORD:}: %{TIME}"
negate => true
what => "previous"
}
grok {

match =>
{
message => '^%{WORD:}: %{TIME:time} %{GREEDYDATA} %{WORD:protocol}%{SPACE}%{IPORHOST:localaddress}:%{POSINT:port}%{SPACE}%{IPORHOST:ForeignAddress}:%{NUMBER:port2}%{SPACE}%{WORD:state}'
}
}

}

Sadly the results from the above configuration are unexpected , after matching time it is taking all lines as single event , but i want to parse each line separately and push it into ES as individual doc's

please help me

The multiline filter has been deprecated, so you should not use it. You should always look to perform multiline processing as close to the source as possible. If you are using Filebeat, then that is where it should be done. If you need to do it in Logstash, you should instead use the multiline codec.

1 Like

But the functionalities are working fine, its an issue with

pattern, negate,what

Multiline just groups lines, it does not take a line and add it to others. For this you probably need to group it all into one event, extract the timestamp and other data from the initial line before splitting up the event using a split or ruby filter.

1 Like

Could you please explain in-depth , i have not worked on ruby-filter , please give an example of how to group all data into single event and break them using ruby

Can you show what your expect the resulting events to look like?

1 Like

input is as bellow..

TIME: 16:14:30.14

Active Connections

Proto Local Address Foreign Address State
TCP 0.0.0.0:80 0.0.0.0:0 LISTENING
TCP 0.0.0.0:2 0.0.0.0:0 ESTABLISHED

..................................................................................................................
TIME: 16:14:40.13

Active Connections

Proto Local Address Foreign Address State
TCP 0.0.0.0:3 0.0.0.0:0 LISTENING

i want output as to be

{

time : 16:14:30.14
Protocol : TCP
localAdd : 0.0.0.0:80
Foreign Address : 0.0.0.0:0
state : LISTENING
}

{
time : 16:14:30.14
Protocol : TCP
localAdd : 0.0.0.0:3
Foreign Address : 0.0.0.0:0
state : ESTABLISHED
}
{
time :16:14:40.13
Protocol : TCP
localAdd : 0.0.0.0:2
Foreign Address : 0.0.0.0:0
state : LISTENING
}

Try something like this:

filter {
  dissect {
    mapping => {
      "message" => "TIME: %{time}
%{}"
    }
  }

  split {}

  if [message] =~ /^(TCP|UDP)/ {
    dissect {
      mapping => {
        "message" => "%{protocol} %{local_address} %{foreign_address} %{state}"
      }
    }
  } else {
    drop {}
  }
}
1 Like

Sorry!! i didn't understand the code , i tried to run it but it is giving some waring messages :

[2018-06-14T18:06:43,046][WARN ][org.logstash.dissect.Dissector] Dissector mapping, field found in event but it was empty {"field"=>"message", "event"=>{"@version"=>"1", "tags"=>["_dissectfailure"],

The first dissect pattern just parses out the time field. You can replace this with grok if you want.

The second filter splits the event by line, and all events keep the time filter that was previously parsed out.

The we drop all lines that does not start with TCP or UDP and parse the remaining lines using a dissect filter.

1 Like

your awesome ....you made my day ....

its working great ... thanks a lot !!!

but one small issue is in time nothing is coming it coming empty

{
"protocol" => "TCP",
"foreign_address" => "0.0.0.0:0",
"time" => "",
"@timestamp" => 2018-06-14T13:31:16.898Z,
"local_address" => "0.0.0.0:2",
"message" => "TCP 0.0.0.0:2 0.0.0.0:0 ESTABLISHED",
"state" => "ESTABLISHED",
"path" => "/home/avk03/JarAndZip/jar-file/config/logstash-6.1.1/Logs/try.txt",
"host" => "avk03-Vostro-3800",
"@version" => "1"
}

Hi @Christian_Dahlqvist thanks for your help.

I am facing some spacing issues while using dissect filter plugin

when my log entry is as bellow

TCP 0.0.0.0:66 0.0.0.0:0 LISTENING (without space in the beginning of line)

parser what you suggested me is working fine..

But when lines having spaces as bellow:

TCP 0.0.0.0:66 0.0.0.0:0 LISTENING (with space in the beginning of line)

parser is throwing some warning messages like:

[2018-06-15T15:01:31,539][WARN ][org.logstash.dissect.Dissector] Dissector mapping, field found in event but it was empty {"field"=>"message", "event"

Could you please help me how to match "white spaces" when we are using dissect filter plugin

OK. Then try to change the latter part to something like this:

  if [message] =~ /^\s+(TCP|UDP)/ {
    dissect {
      mapping => {
        "message" => "%{?tmp->} %{Protocol->} %{localAdd->} %{Foreign Address->} %{state}"
      }
    }
  } else {
    drop {}
  }
1 Like

No, :frowning: its giving wrong output

{
"Protocol" => "TCP",
"localAdd" => "",
"time" => "",
"path" => "/home/avk03/JarAndZip/jar-file/config/logstash-6.1.1/Logs/try.txt",
"state" => " 0.0.0.0:445 0.0.0.0:0 LISTENING",
"@timestamp" => 2018-06-15T10:08:19.779Z,
"@version" => "1",
"message" => " TCP 0.0.0.0:445 0.0.0.0:0 LISTENING",
"host" => "avk03-Vostro-3800",
"Foreign Address" => ""
}

I updated it as I noticed you have variable number of spaces in between fields as well. If it is still not working, read through the dissect documentation and go through the config step by step as described in this blog post.

1 Like

Thanks a lot... it is working as i expected :slight_smile:

{
"message" => " TCP 0.0.0.0:445 0.0.0.0:0 LISTENING",
"host" => "avk03-Vostro-3800",
"Protocol" => "TCP",
"Foreign Address" => "0.0.0.0:0",
"state" => "LISTENING",
"path" => "/home/avk03/JarAndZip/jar-file/config/logstash-6.1.1/Logs/try.txt",
"@version" => "1",
"@timestamp" => 2018-06-15T10:18:52.702Z,
"time" => "",
"localAdd" => "0.0.0.0:445"
}

Hi @Christian_Dahlqvist

Actually this parser is not reading lines Which contains TIME data

{
"state" => "LISTENING",
"time" => "",
"@version" => "1",
"message" => " TCP 100.113.26.16:139 0.0.0.0:0 LISTENING",
"Foreign Address" => "0.0.0.0:0",
"host" => "avk03-Vostro-3800",
"@timestamp" => 2018-06-15T11:17:49.699Z,
"path" => "/home/avk03/JarAndZip/jar-file/config/logstash-6.1.1/Logs/try.txt",
"Protocol" => "TCP",
"localAdd" => "100.113.26.16:139"
}
{
"state" => "ESTABLISHED",
"time" => "",
"@version" => "1",
"message" => " TCP 100.113.26.16:54120 111.221.29.101:443 ESTABLISHED",
"Foreign Address" => "111.221.29.101:443",
"host" => "avk03-Vostro-3800",
"@timestamp" => 2018-06-15T11:17:49.701Z,
"path" => "/home/avk03/JarAndZip/jar-file/config/logstash-6.1.1/Logs/try.txt",
"Protocol" => "TCP",
"localAdd" => "100.113.26.16:54120"
}

I have tried many ways went through documentation, but no luck in message only it is reading TCP|UDP lines , i am not getting why first part of dissect is not functioning ...!!

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.