[SOLVED] Grok and split fields

Hello,

I want to put logs from a Fortigate firewall into elasticsearch.
My fortigate sends logs to my syslog-ng, and logstash reads the log file. I use a grok to parse my log, but I don't understand how to split the "message" into separated fields.

My logstash configuration is :

input{
      file{
                path => "/var/log/firewall/heimdall.log"
                type => "linux-syslog"
        }
}
filter {
        grok {
                match => { 
                "message" => '%{SYSLOGTIMESTAMP} %{IPV4:iphost} date=%{YEAR}-%{MONTHNUM}-%{MONTHDAY} time=%{TIME:timelog} devname=NQHQ-Heimdall devid=FGT90D3Z15019032 logid=0000000013 type=%{WORD:type} subtype=%{WORD:subtype} level=%{WORD:level} vd=root srcip=%{IPV4:srcip} srcport=%{INT:srcport} srcintf=\\"%{WORD:srcintf}\\" dstip=%{IP:dstip} dstport=%{INT:dstport} dstintf=\\"%{WORD:dstintf}\\" sessionid=%{INT:sessionid} proto=%{INT:proto} action=%{WORD:action} policyid=%{INT:policyid} dstcountry=\\"%{WORD:dstcountry}\\" srccountry=\\"%{WORD:srccountry}\\" trandisp=noop service=\\"%{WORD:service}\\" duration=%{INT:duration} sentbyte=%{INT:sendbyte} rcvdbyte=%{INT:rcvbyte} sentpkt=%{INT:sendpkt} appcat=\\"%{WORD:appcat}\\" crscore=%{INT:crscore} craction=%{INT:craction} crlevel=%{WORD:crleel}'
                }
        }
}

output {
        elasticsearch {
                codec => "json" 
                hosts => ["127.0.0.1:9200"]
                index => "heimdall"
        }
        stdout { codec => rubydebug }
}

My grok filter works partially.
If I put all of my filter is not working because of the double quotes, I have not found how to parse that.
For the tests, I have cut the filter like that :
"message" => '%{SYSLOGTIMESTAMP} %{IPV4:iphost} date=%{YEAR}-%{MONTHNUM}-%{MONTHDAY} time=%{TIME:timelog} devname=NQHQ-Heimdall devid=FGT90D3Z15019032 logid=0000000013 type=%{WORD:type} subtype=%{WORD:subtype} level=%{WORD:level} vd=root srcip=%{IPV4:srcip} srcport=%{INT:srcport}

But, the message is not split in the elasticsearch DB.

This is an example of my log :
Sep 19 17:06:53 192.168.0.254 date=2016-09-19 time=17:05:12 devname=devname devid=XXXXXXXX logid=XXXXXXXXXX type=traffic subtype=forward level=notice vd=root srcip=X.X.X.X srcport=50903 srcintf="interfacename" dstip=X.X.X.X dstport=443 dstintf="wan1" sessionid=19515171 proto=17 action=deny policyid=0 dstcountry="United States" srccountry="Reserved" trandisp=noop service="udp/443" duration=0 sentbyte=0 rcvdbyte=0 sentpkt=0 appcat="unscanned" crscore=30 craction=131072 crlevel=high

And the message when logstash has read it :
Sep 19 17:06:53 192.168.0.254 date=2016-09-19 time=17:05:12 devname=devname devid=XXXXXXXX logid=XXXXXXXXXX type=traffic subtype=forward level=notice vd=root srcip=X.X.X.X srcport=50903 srcintf=\"interfacename\" dstip=X.X.X.X dstport=443 dstintf=\"wan1\" sessionid=19515171 proto=17 action=deny policyid=0 dstcountry=\"United States\" srccountry="Reserved" trandisp=noop service="udp/443" duration=0 sentbyte=0 rcvdbyte=0 sentpkt=0 appcat=\"unscanned\" crscore=30 craction=131072 crlevel=high

A backslash is write before each double quotes.

Thanks in advance.

1 Like

Please show an example event produced by your stdout { codec => rubydebug } output.

Hello,

This is an example of output :

"message" => "Sep 19 17:45:04 192.168.X.X date=2016-09-19 time=17:43:23 devname=devname devid=devid logid=0000000013 type=traffic subtype=forward level=notice vd=root srcip=192.168.X.X srcport=51077 srcintf=\"srcintf\" dstip=X.X.X.X dstport=1111 dstintf=\"wan1\" sessionid=19645822 proto=6 action=deny policyid=0 dstcountry=\"dstcountry\" srccountry=\"Reserved\" trandisp=noop service=\"servicename\" duration=0 sentbyte=0 rcvdbyte=0 sentpkt=0 appcat=\"unscanned\" crscore=30 craction=131072 crlevel=high",
      "@version" => "1",
    "@timestamp" => "2016-09-21T08:02:42.967Z",
          "path" => "/var/log/firewall/heimdall.log",
          "host" => "nqhq-srv-lin-syslog",
          "type" => [
        [0] "linux-syslog",
        [1] "traffic"
    ],
        "iphost" => "192.168.X.X",
       "timelog" => "17:43:23",
       "subtype" => "forward",
         "level" => "notice",
         "srcip" => "192.168.X.X",
       "srcport" => "51077"
}

But that looks pretty good. You've extracted at least a handful of fields from the input string. I strongly suggest that you use the kv filter instead of a grok filter with a really long expression. First use one grok filter to extract the initial timestamp and the hostname and store the remainder of the line in a temporary field that you feed to the kv filter.

The is the final config file:
input{
file{
path => "/var/log/firewall/heimdall.log"
type => "linux-syslog"
}
}

filter {
        grok {
                match => { "message" => '%{SYSLOGTIMESTAMP} %{IPV4:iphost} %{GREEDYDATA:fgtlogmsg}'
                }
        }
        kv {
                source => "fgtlogmsg"
        }
}

output {
        elasticsearch {
                codec => "json"
                hosts => ["127.0.0.1:9200"]
                index => "heimdall"
        }
        stdout { codec => rubydebug }
}

It seems to work very well. In Kibana I have the different fields.

Thanks so much. Now I will search how to searching in Kibana.