Logstash kv plugin is not working

I am using ELK 7.6.2

The problem is when I remove the kv plugin, there is log loaded to Elasticsearch, when I add kv plugin, there is zero data can be loaded to Elasticsearch.
Moreover, there is no error is logged in the logstash-plain.log
I have Googled but no solution is out there. Please help. Thank you.

The log sample is

Mar 24 16:51:40 2022 notice 111.2.333.44 111.22.333.44 time=16:51:39 devname="FG0000-NAT-B" devid="FG1K5Drfvv2345" logid="0000000013" type="traffic" subtype="forward" level="notice" vd="nat" eventtime=1648111900042439291 tz="+0800" srcip=11.222.333.144 srcport=523 srcintf="port34" srcintfrole="undefined" dstip=123.13.64.44 dstport=444 dstintf="port34" dstintfrole="undefined" sessionid=1355513311 proto=6 action="close" policyid=3 policytype="policy" poluuid="016d412dr0-72131298-51e7-3af0-d351asf32980" service="HTTPS" dstcountry="Singapore" srccountry="Reserved" trandisp="snat" transip=213.111.149.44 transport=2214 duration=2 sentbyte=4709 rcvdbyte=7217 sentpkt=23 rcvdpkt=17 shapingpolicyid=3 shaperperipname="ts-perip-web-dns" shaperperipdropbyte=0 appcat="unscanned"

In the filter plugin

grok {
match => { "message" => "^%{SYSLOGTIMESTAMP:timestamp}\s+%{YEAR:year}\s%{WORD:priority}\s%{IP:sourceIP}\s%{IP:host}\s%{GREEDYDATA:msg}$" }

}

kv {
source => "msg"
trim_value => "\""
value_split => "="
field_split_pattern => "\s+"
#remove_field => ["msg"]
}

push

If changing the filters results in the event not being indexed there is most likely a mapping exception in the logstash logs due to a type conflict in the Elasticsearch index. Check the logstash logs.

The log didn't have related error,
I do not have set any mapping for the index, it is newly created

anyone experiencing this?

What is your output? Is your grok working?

I tried to simulate it and your grok is not working, since your grok is not working you won't have the field to be used in the kv filter.

From your sample message you are trying to parse the kv message from Fortigate devices, this is pretty simple.

You do not need the extra configuration in your kv filter, just the source option is enough to parse this kind of log.

And if your messages have always the same structure you also do not need grok, you could use a dissect filter.

Try the following:

filter {
    dissect {
        mapping => {
            "message" => "%{timestamp} %{+timestamp} %{+timestamp} %{year} %{priority} %{sourceIP} %{host} %{msg}"
        }
        remove_field => ["message"]
    }
    kv {
        source => "msg"
        remove_field => ["msg"]
    }
}

The above filter configuration will parse your message and remove both the original message field and the sourcer kv msg field.

Your result will be similar to this one:

{
                  "level" => "notice",
                "transip" => "213.111.149.44",
                  "proto" => "6",
                "sentpkt" => "23",
              "timestamp" => "Mar 24 16:51:40",
                   "type" => "traffic",
             "policytype" => "policy",
                   "host" => "111.22.333.44",
                "srcport" => "523",
                "rcvdpkt" => "17",
             "@timestamp" => 2022-04-02T19:49:54.277Z,
                "dstintf" => "port34",
               "policyid" => "3",
                "dstport" => "444",
                  "devid" => "FG1K5Drfvv2345",
                 "action" => "close",
                   "year" => "2022",
            "srcintfrole" => "undefined",
               "@version" => "1",
                "subtype" => "forward",
                   "time" => "16:51:39",
                  "logid" => "0000000013",
              "eventtime" => "1648111900042439291",
                "devname" => "FG0000-NAT-B",
    "shaperperipdropbyte" => "0",
        "shapingpolicyid" => "3",
             "srccountry" => "Reserved",
                     "vd" => "nat",
                 "appcat" => "unscanned",
               "priority" => "notice",
              "transport" => "2214",
               "duration" => "2",
                "srcintf" => "port34",
                  "dstip" => "123.13.64.44",
             "dstcountry" => "Singapore",
                "poluuid" => "016d412dr0-72131298-51e7-3af0-d351asf32980",
              "sessionid" => "1355513311",
               "rcvdbyte" => "7217",
               "sourceIP" => "111.2.333.44",
               "trandisp" => "snat",
               "sentbyte" => "4709",
                     "tz" => "+0800",
        "shaperperipname" => "ts-perip-web-dns",
            "dstintfrole" => "undefined",
                "service" => "HTTPS",
                  "srcip" => "11.222.333.144"
}

Thank you for your reply,

Is there anything wrong in my filter config? Would you point me out?
The grok plugin worked for me,
btw how do I use kv filter without any configuration to parse the beginning log part?
Mar 24 16:51:40 2022 notice 111.2.333.44 111.22.333.44

dissect may not work for me since the log structure is changing due to optional fields

You are right, your grok is working, just checked it, the issue was that your sample message has a couple of invalid IPs, 111.2.333.44 is not a valid IP, which broke the pipeline when I tried to simulate, so I assumed that the grok was broke.

btw how do I use kv filter without any configuration to parse the beginning log part?
Mar 24 16:51:40 2022 notice 111.2.333.44 111.22.333.44

You don't, the kv filter only works for valid key-value messages, so since your message has a string part and a key-value part, you need to get the key-value part in a separated field, your grok is already doing this putting the key-value part in the msg field and creating the other fields from the string part.

After your grok, you have the msg field with the following content:

time=16:51:39 devname="FG0000-NAT-B" devid="FG1K5Drfvv2345" logid="0000000013" type="traffic" subtype="forward" level="notice" vd="nat" eventtime=1648111900042439291 tz="+0800" srcip=11.222.333.144 srcport=523 srcintf="port34" srcintfrole="undefined" dstip=123.13.64.44 dstport=444 dstintf="port34" dstintfrole="undefined" sessionid=1355513311 proto=6 action="close" policyid=3 policytype="policy" poluuid="016d412dr0-72131298-51e7-3af0-d351asf32980" service="HTTPS" dstcountry="Singapore" srccountry="Reserved" trandisp="snat" transip=213.111.149.44 transport=2214 duration=2 sentbyte=4709 rcvdbyte=7217 sentpkt=23 rcvdpkt=17 shapingpolicyid=3 shaperperipname="ts-perip-web-dns" shaperperipdropbyte=0 appcat="unscanned"

To parse this with kv you just need:

kv {
    source => "msg"
}

But again, it is not clear what is your issue since you didn't share any output you are getting and any log with errors.

anything I can get for you? there is no error log in logstash-plain.log

You need to share what is the output you are getting, also share your full pipeline.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.