How to create two kv filter


(John) #1

Hi,
I have two syslogs input feeds from two sources.
One is coming with comma and another one is with space.

So, if I configure only with udp 5514, how do I split these two inputs using two filters as below.

filter {
if [type] == "firewall1" {

kv {
field_split => ","
}
}
else if [type] == "firewall2" {

kv {
field_split => " "
}
}


(Nachiket) #2

What you have mentioned should work just fine. What is the issue?

A sample input with an expected output would probably help.


(John) #3

Thanks, NerdSec.
This is the current config.
If I do like this, it hit to first one and not hit to second one (syslog). How do I create correct this?

  • One input (udp=5514, type=syslog)
  • Filtering two (one for comma and another one for space.

input {
udp {
port => 5514
type => "syslog"
codec => json
}
}

filter {
mutate {
gsub =>
["message", ": ", ":",
"message", "^<[0-9][0-9][0-9]>", ""]
}

filter {
if [type] == "firewall1" {

kv {
field_split => ","
}
}
else if [type] == "syslog" {

kv {
field_split => " "
}
}

if [msg] {
mutate {
replace => [ "message", "%{msg}" ]
}
}
}
}
output {
elasticsearch { hosts => ["localhost:9200"] }
stdout { codec => rubydebug }
}


(Magnus Bäck) #4

You're always setting type to "syslog" so [type] == "firewall1" will never be true. Are there any other characteristics of the message that would allow a classification? This'll be easier if you show us examples of both kinds of messages.


(John) #5

Thanks, Mangnus.
This is the one I am having issue.
I have two sources, how do I identified using one syslog UDP port 5514?

Both two inputs are from firewalls and should I use two UDP ports instead of one?
Or, using only one udp port and filter by filter type host?
filter {
if [host] == "10.50.0.17"


(Magnus Bäck) #6

As I said, this'll be easier if you show us examples of both kinds of messages.


(John) #7

Hi Magnus,

Here it is.
2018-02-17 01:14:59 System4.Notice x.x.x.x date=2018-02-17 time=01:14:59 devname=FIREWALL1 devid=FIREWALL1 logid=0001000014 type=traffic subtype=local level=notice vd=root srcip=x.x.x.x srcport=137 srcintf="VL1" dstip=x.x.x.x dstport=137 dstintf=unknown-0 sessionid=xxxxxxxxx proto=17 action=deny policyid=0 dstcountry="Reserved" srccountry="Reserved" trandisp=noop service="AD Services" app="netbios forward" duration=0 sentbyte=0 rcvdbyte=0 sentpkt=0 appcat="unscanned"

2018-02-17 01:14:59 Syslog.Notice x.x.x.x Feb 17 01:14:59 FIREWALL2 auditd: date="2018-02-16 17:14:59 +0000",fac=f_kernel_ipfilter,area=a_general_area,type=t_nettraffic,pri=p_major,hostname=FIREWALL2,event="session timeout",application=TCP-8888,netsessid=xxxxxxxxxx,srcip=x.x.x.x,srcport=52321,srczone=INTERNAL,protocol=6,dstip=x.x.x.x,dstport=8021,dstzone=DMZ,bytes_written_to_client=9980,bytes_written_to_server=1350,rule_name="INTERNET ACCESS",cache_hit=0,start_time="2018-02-16 15:12:58 +0000"


(Magnus Bäck) #8

Okay, so use e.g. a grok filter to separate "2018-02-17 01:14:59 System4.Notice x.x.x.x" from the actual payload, then inspect the payload with a conditional. Does it begin with "date="? Then it's one kind of syslog message, otherwise it's of the other kind.


(John) #9

Actually, the first one FIREWALL1 is working well with below config. None of the kv filter applied yet.
Only FIREWALL2 is having issue now. If I used , kv filter, field_split"=" it works well.

input {
udp {
port => 5514
type => "syslog"
codec => json
}
}

filter {
mutate {
gsub =>
["message", ": ", ":",
"message", "^<[0-9][0-9][0-9]>", ""]
}

kv { }

if [msg] {
mutate {
replace => [ "message", "%{msg}" ]
}
}
}

output {
elasticsearch { hosts => ["localhost:9200"] }
stdout { codec => rubydebug }
}


(Guy Boertje) #10

@icguy

What is the json codec used for?

You should really follow @magnusbaeck advice.

You need to isolate the KV portion of the log message into a separate field and grok or dissect the pieces of 2018-02-17 01:14:59 System4.Notice x.x.x.x and 2018-02-17 01:14:59 Syslog.Notice x.x.x.x Feb 17 01:14:59 FIREWALL2 auditd into fields as well then you can use a conditional to apply the correct kv filter.


(John) #11

@guyboertje.
I am not very good in grok and others scripting language.
I'll try my best to look other ways.


(Guy Boertje) #12

Simply ask for help with Grok or Dissect de-structuring here. :smiley:

When we that you using UDP, kv and mutate in a config, perhaps we assumed that you were a more experienced Logstash user.

Also, with only one example of each format of message, its not easy to give a definite grok or dissect pattern.


(John) #13

@guyboertje ha ha.. No.... I am just newbie and learning logstash and ELK.
Just want to capture two firewall logs and now stuck :smiley::grinning:


(Guy Boertje) #14

OK. Fair enough. Lets start helping...

Give me three examples of each format in their original JSON as strings.

I like to use the generator input with the strings as it makes experimenting much easier. Here is an example of what I mean:

input {
  generator {
    lines => [
      '{"message":"2018-02-17 01:14:59 Syslog.Notice x.x.x.x Feb 17 01:14:59 FIREWALL2 auditd: date=\"2018-02-16\""}'
    ]
    count => 1
  }
}

filter {
  json {
    source => "message"
  }
  dissect {
    mapping => {
      message => "%{date} %{time} %{message_source} %{ip_address} %{month} %{day} %{time_other} %{hardware_source} %{process}: %{kv_values}"
    }
  }
}

output {
  stdout {
    codec => rubydebug
  }
}

Gives:

{
               "date" => "2018-02-17",
               "host" => "Elastics-MacBook-Pro.local",
         "time_other" => "01:14:59",
           "sequence" => 0,
         "ip_address" => "x.x.x.x",
                "day" => "17",
               "time" => "01:14:59",
    "hardware_source" => "FIREWALL2",
            "message" => "2018-02-17 01:14:59 Syslog.Notice x.x.x.x Feb 17 01:14:59 FIREWALL2 auditd: date=\"2018-02-16\"",
     "message_source" => "Syslog.Notice",
              "month" => "Feb",
            "process" => "auditd",
           "@version" => "1",
          "kv_values" => "date=\"2018-02-16\"",
         "@timestamp" => 2018-04-26T15:59:33.366Z
}

(John) #15

Thanks, @guyboertje
Let me get back to you.


(John) #16

@guyboertje
Any idea how to test the one you mentioned previously?

I have just checked in my syslog details and the formats that I posted before is wrong. Here are the correct one.

There are in front of logs.

<189>date=2018-04-28 time=12:21:13 devname=FIREWALL1 devid=xxxxxxxx logid=0000000013 type=traffic subtype=forward level=notice vd=root srcip=x.x.x.x srcport=137 srcintf="VL_MGMT" dstip=x.x.x.x dstport=137 dstintf="port9" poluuid=xxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxx sessionid=1095303991 proto=17 action=deny policyid=498 dstcountry="Reserved" srccountry="Reserved" trandisp=noop service="HTTP Servcice" duration=0 sentbyte=0 rcvdbyte=0 sentpkt=0 appcat="unscanned" crscore=30 craction=131072 crlevel=high

<45>Apr 28 12:21:14 FIREWALL2 auditd: date="2018-04-28 04:21:14 +0000",fac=f_kernel_ipfilter,area=a_general_area,type=t_nettraffic,pri=p_major,hostname=FIREWALL2.domain.local,event="session end",application=TCP-443,netsessid=b26845ae3f6b9,src_geo=XX,srcip=x.x.x.x,srcport=35478,srczone=VL_EXT,protocol=6,dstip=x.x.x.x,dstport=443,dstzone=VL_1,bytes_written_to_client=324,bytes_written_to_server=848,rule_name="TESTING RULE",cache_hit=0,start_time="2018-04-28 04:21:13 +0000"


(Guy Boertje) #17

@lcguy

So the firewall are not sending JSON via UDP - just plain formatted strings. I also see that you got some help about your firewalls Fortigate here and Sidewinder here. Some of this advice I have retained and some I have done a little differently...

IMPORTANT!!!! Read my follow up notes post before diving in and trying the config below.

input {
  generator {
    lines => [
      '<189>date=2018-04-28 time=12:21:13 devname=FIREWALL1 devid=xxxxxxxx logid=0000000013 type=traffic subtype=forward level=notice vd=root srcip=x.x.x.x srcport=137 srcintf="VL_MGMT" dstip=x.x.x.x dstport=137 dstintf="port9" poluuid=xxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxx sessionid=1095303991 proto=17 action=deny policyid=498 dstcountry="Reserved" srccountry="Reserved" trandisp=noop service="HTTP Servcice" duration=0 sentbyte=0 rcvdbyte=0 sentpkt=0 appcat="unscanned" crscore=30 craction=131072 crlevel=high',
      '<45>Apr 28 12:21:14 FIREWALL2 auditd: date="2018-04-28 04:21:14 +0000",fac=f_kernel_ipfilter,area=a_general_area,type=t_nettraffic,pri=p_major,hostname=FIREWALL2.domain.local,event="session end",application=TCP-443,netsessid=b26845ae3f6b9,src_geo=XX,srcip=x.x.x.x,srcport=35478,srczone=VL_EXT,protocol=6,dstip=x.x.x.x,dstport=443,dstzone=VL_1,bytes_written_to_client=324,bytes_written_to_server=848,rule_name="TESTING RULE",cache_hit=0,start_time="2018-04-28 04:21:13 +0000"'
    ]
    count => 1
  }
}

filter {
  dissect {
    mapping => {
      message => "<%{priority}>%{rest}"
    }
  }
  if [rest] =~ /^\w{3}\s/ {
    dissect {
      mapping => {
        # using skip fields for month day and time because in kv 'date' has the UTC based timestamp
        rest => "%{?month} %{?day} %{?time} %{devname} %{process_name}: %{rest}"
      }
      add_tag => ["format2"]
    }
  }
  if "format2" in [tags] {
    kv {
      source => "[rest]"
      value_split => "="
      field_split => ","
      remove_field => [ "[message]", "[rest]" ]
    }
    mutate {
      rename => { "protocol" => "proto"}
      # !!!!! rename or remove any other fields here (not date though).
    }
    date {
      match => [ "date", "yyyy-MM-dd HH:mm:ss Z" ]
    }
  } else {
    # handle first format here
    kv {
      source => "[rest]"
      value_split => "="
      field_split => " "
      remove_field => [ "[message]", "[rest]" ]
    }
    mutate {
      # combine the date and time and add the timezone
      replace => { "date" => "%{date}T%{time}+0800" }
      # !!!!! rename or remove any other fields here (not date though).
      # add_field, remove_field etc always runs after any other mutate operations
      remove_field => [ "[time]" ]
    }
    date {
      match => [ "date", "ISO8601" ]
    }
  }
}

output {
  stdout {
    codec => rubydebug
  }
}

This gives...

{
            "vd" => "root",
      "sequence" => 0,
      "sentbyte" => "0",
       "subtype" => "forward",
       "srcintf" => "VL_MGMT",
      "policyid" => "498",
          "date" => "2018-04-28T12:21:13+0800",
         "level" => "notice",
    "srccountry" => "Reserved",
       "crscore" => "30",
         "devid" => "xxxxxxxx",
       "crlevel" => "high",
    "@timestamp" => 2018-04-28T04:21:13.000Z,
    "dstcountry" => "Reserved",
       "dstport" => "137",
      "@version" => "1",
       "sentpkt" => "0",
     "sessionid" => "1095303991",
        "appcat" => "unscanned",
       "devname" => "FIREWALL1",
         "srcip" => "x.x.x.x",
         "dstip" => "x.x.x.x",
       "poluuid" => "xxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxx",
      "trandisp" => "noop",
       "service" => "HTTP Servcice",
      "rcvdbyte" => "0",
       "dstintf" => "port9",
          "type" => "traffic",
        "action" => "deny",
      "priority" => "189",
          "host" => "Elastics-MacBook-Pro.local",
         "logid" => "0000000013",
       "srcport" => "137",
      "craction" => "131072",
      "duration" => "0",
         "proto" => "17"
}
{
                   "sequence" => 0,
                 "start_time" => "2018-04-28 04:21:13 +0000",
                       "date" => "2018-04-28 04:21:14 +0000",
                    "src_geo" => "XX",
                 "@timestamp" => 2018-04-28T04:21:14.000Z,
                   "hostname" => "FIREWALL2.domain.local",
                    "dstport" => "443",
                   "@version" => "1",
                       "area" => "a_general_area",
                  "rule_name" => "TESTING RULE",
                       "tags" => [
        [0] "format2"
    ],
                    "devname" => "FIREWALL2",
               "process_name" => "auditd",
                  "cache_hit" => "0",
                      "srcip" => "x.x.x.x",
                        "pri" => "p_major",
                  "netsessid" => "b26845ae3f6b9",
                      "dstip" => "x.x.x.x",
    "bytes_written_to_server" => "848",
                    "dstzone" => "VL_1",
                       "type" => "t_nettraffic",
                   "priority" => "45",
                    "srczone" => "VL_EXT",
                       "host" => "Elastics-MacBook-Pro.local",
                    "srcport" => "35478",
    "bytes_written_to_client" => "324",
                      "proto" => "6",
                      "event" => "session end",
                "application" => "TCP-443",
                        "fac" => "f_kernel_ipfilter"
}

(John) #18

Thanks, @guyboertje.
Sorry, how do I test to get the output like the one you show?

If all works, I can change the input to port 5514 and type to syslog?


(Guy Boertje) #19

NOTES TO PREVIOUS POST.

  1. I used two dissect filters, 1 for all messages and 1 for when the rest field starts with a three letter month and a space. I the second dissect, as it applies to the second format I added a format2 tag.
  2. I used the format2 tag to create a conditional section to handle format2 logs and the else section to handle the other lines. This can be done in one step (config below) but I included it in the previous post to how to use tags to control which filters get applied to matching log lines.
  3. I renamed the protocol field to proto in the second format as an example.
  4. The example uses the generator input, this should be replaced eventually with the udp input but leave it in until you are sure your events/documents are in the correct shape.

However, you still have one more important task:

You have to decide on a common schema this is important for Kibana visualizations. The two formats have have some common and some different field names. You should "normalize" this by renaming, adding to one or removing fields from the other until you have one set of fields that mean the same thing and their values are in the same units.

One conditional section version...

input {
  generator {
    lines => [
      '<189>date=2018-04-28 time=12:21:13 devname=FIREWALL1 devid=xxxxxxxx logid=0000000013 type=traffic subtype=forward level=notice vd=root srcip=x.x.x.x srcport=137 srcintf="VL_MGMT" dstip=x.x.x.x dstport=137 dstintf="port9" poluuid=xxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxx sessionid=1095303991 proto=17 action=deny policyid=498 dstcountry="Reserved" srccountry="Reserved" trandisp=noop service="HTTP Servcice" duration=0 sentbyte=0 rcvdbyte=0 sentpkt=0 appcat="unscanned" crscore=30 craction=131072 crlevel=high',
      '<45>Apr 28 12:21:14 FIREWALL2 auditd: date="2018-04-28 04:21:14 +0000",fac=f_kernel_ipfilter,area=a_general_area,type=t_nettraffic,pri=p_major,hostname=FIREWALL2.domain.local,event="session end",application=TCP-443,netsessid=b26845ae3f6b9,src_geo=XX,srcip=x.x.x.x,srcport=35478,srczone=VL_EXT,protocol=6,dstip=x.x.x.x,dstport=443,dstzone=VL_1,bytes_written_to_client=324,bytes_written_to_server=848,rule_name="TESTING RULE",cache_hit=0,start_time="2018-04-28 04:21:13 +0000"'
    ]
    count => 1
  }
}

filter {
  dissect {
    mapping => {
      message => "<%{priority}>%{rest}"
    }
  }
  if [rest] =~ /^\w{3}\s/ {
    dissect {
      mapping => {
        # using skip fields for month day and time because in kv 'date' has the UTC based timestamp
        rest => "%{?month} %{?day} %{?time} %{devname} %{process_name}: %{rest}"
      }
    }
    kv {
      source => "[rest]"
      value_split => "="
      field_split => ","
      remove_field => [ "[message]", "[rest]" ]
    }
    mutate {
      rename => { "protocol" => "proto"}
      # !!!!! rename or remove any other fields here (not date though).
    }
    date {
      match => [ "date", "yyyy-MM-dd HH:mm:ss Z" ]
    }
  } else {
    # handle first format here
    kv {
      source => "[rest]"
      value_split => "="
      field_split => " "
      remove_field => [ "[message]", "[rest]" ]
    }
    mutate {
      # combine the date and time and add the timezone
      replace => { "date" => "%{date}T%{time}+0800" }
      # !!!!! rename or remove any other fields here (not date though).
      # add_field, remove_field etc always runs after any other mutate operations
      remove_field => [ "[time]" ]
    }
    date {
      match => [ "date", "ISO8601" ]
    }
  }
}

output {
  stdout {
    codec => rubydebug
  }
}

(Guy Boertje) #20

Copy the config to an editor and save it. I use Sublime Text and the Ruby syntax (better info on comments and opening closing curly braces etc.). On my macbook it is in ~/tmp/testing/confs/test-json-kv.conf.

bin/logstash -f ~/tmp/testing/confs/test-json-kv.conf