Extract data from { }

Hi,

I'm struggling with parsing Fortigate logs. Right now I got up to the point where I got all log data within a field marked with { logdata }.

I'm using a grok filter and a KV filter after that. The problem with the KV filter is that if I don't apply a target logstash isn't doing anything. It runs, no errors etc. but no file is created. If I define I target the data gets written there and looks good but I just need to get rid of kv: { and } so everything is a separate field:value.

How can I create separate fields based on the data within { }?

Raw log

"@version":"1","host":"172.16.10.111","@timestamp":"2017-11-16T04:44:28.149Z","message":"<188>date=2017-11-16,time=04:44:26,devname=xxxxxxx,device_id=xxxxxxx,log_id=0038000007,type=traffic,subtype=other,pri=warning,vd=root,src=10.0.0.146,src_port=49156,src_int=\"wan1\",dst=255.255.255.255,dst_port=1947,dst_int=\"root\",SN=44832,status=deny,policyid=0,dst_country=\"Reserved\",src_country=\"Reserved\",service=MMS,proto=17,duration=16225,sent=0,rcvd=0,msg=\"iprope_in_check() check failed, drop\"","type":"fortilog"

Config

input {
   udp {
     port => 9910
    type => "fortilog"
  }
}


filter {
 if [type] == "fortilog" {

	grok {
			match => ["message", "%{SYSLOG5424PRI:syslog_index}%{GREEDYDATA:message}"]
			overwrite => [ "message" ]
			tag_on_failure => [ "grok_failure" ]
		}


        kv {
    source => "message"
    value_split => "="
    field_split => ","
    target => "kv"
}

    mutate {
    remove_field => ["message"]

}
}
}


output {
 if [type] == "fortilog" {
 file {
   path => "/home/test/Desktop/test/forti/test-%{+YYYY-MM-dd.HH}.gz"
   gzip => true
 }
}
}

Creates the following output

{"@timestamp":"2017-11-16T05:31:06.684Z","syslog_index":"<188>","syslog5424_pri":"188","@version":"1","host":"172.16.10.111","kv":{"date":"2017-11-16","src_int":"wan1","msg":"iprope_in_check() check failed, drop","dst":"10.0.0.255","type":"traffic","dst_int":"root","duration":"19023","policyid":"0","subtype":"other","devname":"xxxxxxxxxx","SN":"52890","dst_country":"Reserved","log_id":"0038000007","device_id":"xxxxxxxxxx","src":"10.0.0.208","pri":"warning","rcvd":"0","sent":"0","vd":"root","src_port":"17500","src_country":"Reserved","service":"17500/udp","proto":"17","dst_port":"17500","time":"05:31:01","status":"deny"},"type":"fortilog"}

How can I extract the data inside kv: { } into separate fields? I tried a json filter, another kv filter etc. but I just can't get it to work.

They are separate fields, just nested under the kv field. If you don't want that then remove target => "kv".

The problem is that nothing happens when I remote the target field.

Logstash is running, listening, not reporting any errors but no file is created.

If for example I put a second KV filter after the first one, checking the target field, the output is exactly the same as without the second KV filter, leaving me to believe that

KV {
value_split => ":" (or "=")
field_split => ","

Isn't doing anything or trowing away all data or whatever.

Okay, hold on, I think I figured it out.

I'm going to check but I must be an idiot.

Here's a minimal example that works as expected for me:

$  cat data 
date=2017-11-16,time=04:44:26,devname=xxxxxxx,device_id=xxxxxxx,log_id=0038000007,type=traffic,subtype=other,pri=warning,vd=root,src=10.0.0.146,src_port=49156,src_int="wan1",dst=255.255.255.255,dst_port=1947,dst_int="root",SN=44832,status=deny,policyid=0,dst_country="Reserved",src_country="Reserved",service=MMS,proto=17,duration=16225,sent=0,rcvd=0,msg="iprope_in_check() check failed, drop"
$ cat test.config 
input { stdin {} }
output { stdout { codec => rubydebug } }
filter {
  kv {
    value_split => "="
    field_split => ","
  }
}
$ /opt/logstash/bin/logstash -f test.config < data
Settings: Default pipeline workers: 8
Pipeline main started
{
        "message" => "date=2017-11-16,time=04:44:26,devname=xxxxxxx,device_id=xxxxxxx,log_id=0038000007,type=traffic,subtype=other,pri=warning,vd=root,src=10.0.0.146,src_port=49156,src_int=\"wan1\",dst=255.255.255.255,dst_port=1947,dst_int=\"root\",SN=44832,status=deny,policyid=0,dst_country=\"Reserved\",src_country=\"Reserved\",service=MMS,proto=17,duration=16225,sent=0,rcvd=0,msg=\"iprope_in_check() check failed, drop\"",
       "@version" => "1",
     "@timestamp" => "2017-11-16T07:19:29.463Z",
           "host" => "lnxolofon",
           "date" => "2017-11-16",
           "time" => "04:44:26",
        "devname" => "xxxxxxx",
      "device_id" => "xxxxxxx",
         "log_id" => "0038000007",
           "type" => "traffic",
        "subtype" => "other",
            "pri" => "warning",
             "vd" => "root",
            "src" => "10.0.0.146",
       "src_port" => "49156",
        "src_int" => "wan1",
            "dst" => "255.255.255.255",
       "dst_port" => "1947",
        "dst_int" => "root",
             "SN" => "44832",
         "status" => "deny",
       "policyid" => "0",
    "dst_country" => "Reserved",
    "src_country" => "Reserved",
        "service" => "MMS",
          "proto" => "17",
       "duration" => "16225",
           "sent" => "0",
           "rcvd" => "0",
            "msg" => "iprope_in_check() check failed, drop"
}
Pipeline main has been shutdown
stopping pipeline {:id=>"main"}

You can verify that it works on your side, then start migrating it towards the desired configuration.

The very dumb mistake I made was screwing up the if [type] == in my output. The Fortigate logs contain a type field as well. Logstash was looking at the Fortigate type field instead of the one created by Logstash.

I now got a fully working config that does all the processing in one go. I already tested outputting directly to Elastic and that works fine so for my use case I think I can just use a simple json codec config on my main server to read the compressed files created by the remote client.

I'll open a separate topic with the full config for the board/google search.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.