If the intended value itself contains the 'field split', can we use kv processor?

The following sample log entry , is in the form of key=value pairs

action="blocked" direction="outgoing" msg="from ip is in blocklist" proto=6 

I have tried the following kv processor in ingest pipeline for the log entry

{
      "kv": {
        "field": "kvmsg",
        "field_split": " ",
        "value_split": "="
      }
    },

Here "field_split" is " ", ie "space"
But it fails since, for the key and value pair, msg="from ip is in blocklist" ,
since the intended value "from ip is in blocklist" itself contains the 'field split' ie "space"

Is any method possible to still use a kv processor for this log entry?
Thanks and regards
shini

Probably a more efficient way.

action="blocked" direction="outgoing" msg="from ip is in blocklist" proto=6
gsub " to -
action=-blocked- direction=-outgoing- msg=-from ip is in blocklist- proto=6
gsub =- to =
action=blocked- direction=outgoing- msg=from ip is in blocklist- proto=6

Now you can KV based on = and using - with a space as a separator.

POST /_ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "_description",
    "processors": [
      {
        "gsub": {
          "field": "message",
          "pattern": "\"",
          "replacement": "-"
        }
      },
      {
        "gsub": {
          "field": "message",
          "pattern": "=-",
          "replacement": "="
        }
      },      
      {
        "kv": {
          "field": "message",
          "field_split": "- ",
          "value_split": "="
        }
      }
    ]
  },
  "docs": [
    {
      "_index": "index",
      "_id": "id",
      "_source": {
        "message": """action="blocked" direction="outgoing" msg="from ip is in blocklist" proto=6"""
      }
    }
  ]
}

Sir,
It works for the log part as you explained.
The actual logs to be handled are as given below where even after proto=6 we have fields.
Since 6 doesnt have "" around it, ie before and after 6 , when we follow the above method we dont get - with a space as a separator after proto=6.

So how to handle that situation?

The actual logs have space in the msg and subject fields.
The log entries also have some fields different in each rows.

2021-09-27T04:03:18.223319+05:30 _gateway date=2021-09-27 time=04:05:36 devname="PPFW06" devid="FGT5HD3916803686" eventtime=1632695736931083602 tz="+0530" logid="0512044481" type="utm" subtype="emailfilter" eventtype="email" level="information" vd="root" policyid=5 sessionid=1501138 srcip=209.85.85.86 srcport=2012 srcintf="port12" srcintfrole="wan" dstip=10.101.200.49 dstport=25 dstintf="port2" dstintfrole="dmz" proto=6 service="SMTPS" profile="sec_Email_Antispam_WAN_DMZ" action="log-only" from="vidyas@gmail.com" to="hariss@sec2.gov.in,lalu@sec.gov.in,lali@gmail.com" sender="rema@gmail.com" recipient="lalu@sec.gov.in" direction="outgoing" msg="testnospace" subject="Testmail1" size="9943" attachment="yes"
2021-09-27T04:10:26.326831+05:30 _gateway date=2021-09-27 time=04:12:44 devname="PPFW02" devid="FGT5HD3916803686" eventtime=1632696165098764598 tz="+0530" logid="0512044481" type="utm" subtype="emailfilter" eventtype="email" level="information" vd="root" policyid=5 sessionid=1505786 srcip=209.71.72.71 srcport=32277 srcintf="port12" srcintfrole="wan" dstip=172.18.200.59 dstport=25 dstintf="port2" dstintfrole="dmz" proto=6 service="SMTPS" profile="pp_Email_Antispam_WAN" action="log-only" from="users@in.jbe.best-jobs-online.com" to="dfa@sec.gov.in" sender="bounce+88.78b08ef-amt=sec.gov.in@in.jbe.good-jobs-online.com" recipient="amt@sec.gov.in" direction="outgoing" msg="testnospace" subject="testmail33" size="110" attachment="no"

If there were no space in the msg and subject fields,
the following pipeline works

PUT _ingest/pipeline/lp_indexttt-pipeline
{
  "description": "Test Pipeline for key-value",
  "processors": [
    {
      "dissect": {
        "field": "message",
        "pattern" : "%{timestamp} _gateway  %{kvmsg}"
      }
    },
    {
      "kv": {
        "field": "kvmsg",
        "field_split": " ",
        "value_split": "="
      }
    },
    {
      "date": {
        "field": "timestamp",
        "formats": [
          "ISO8601"
        ],
        "output_format": "yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSSXXX"
      }
    },
    {
      "remove": {
        "field": "timestamp"
      }
    }
  ]
}

But it cannot handle space.

How to proceed further?

Thanks and regards,
shini

Seeing the full logs now I would swap over to grok and this would be much easier than KV.

Here is the start to the pattern. Just need to continue through the whole message.

%{TIMESTAMP_ISO8601:timestamp} %{WORD:gateway} date=%{DATA:date} time=%{DATA:time} devname="%{DATA:devname}" devid=

The problem with grok is that these are Fortigate firewall logs and depending on the features enabled you can have hundreds of different lines with different fields order, so it will need a lot of grok processors, which will certainly have an impact on the performance.

Filebeat has a fortinet module to parse those messages, so I think that the best approach would be to replicate how the filebeat ingest pipeline is parsing that message.

This is how the ingest pipeline is parsing the message:

- grok:
    field: message
    patterns:
    - '%{SYSLOG5424PRI}%{GREEDYDATA:syslog5424_sd}$'
- kv:
    field: syslog5424_sd
    field_split: " (?=[a-z\\_\\-]+=)"
    value_split: "="
    prefix: "fortinet.tmp."
    ignore_missing: true
    ignore_failure: false
    trim_value: "\""
- rename:
    field: fortinet.tmp
    target_field: fortinet.firewall
    ignore_missing: true

I think that you can use the following processors in your ingest pipeline to parse this message:

{
  "dissect": {
    "field": "message",
    "pattern" : "%{timestamp} _gateway %{kvmsg}"
  }
},
{
  "kv": {
    "field": "kvmsg",
    "field_split": " (?=[a-z\\_\\-]+=)",
    "value_split": "=",
    "trim_value": "\""
  }
}
2 Likes

So using what @leandrojmp provided... Thanks Today I Learned!

POST /_ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "time test",
    "version": 0,
    "processors": [
      {
        "kv": {
          "field": "message",
          "field_split": """ (?=[a-z\_\-]+=)""",
          "value_split": "=",
          "ignore_missing": true,
          "ignore_failure": false,
          "trim_value": "\"",
          "strip_brackets": true
        }
      }
    ]
  },
  "docs": [
    {
      "_index": "m-index",
      "_id": "kMpUTHoBr7SFhhL5-98P",
      "_source": {
        "message": """date=2021-09-27 time=04:12:44 devname="PPFW02" devid="FGT5HD3916803686" eventtime=1632696165098764598 tz="+0530" logid="0512044481" type="utm" subtype="emailfilter" eventtype="email" level="information" vd="root" policyid=5 sessionid=1505786 srcip=209.71.72.71 srcport=32277 srcintf="port12" srcintfrole="wan" dstip=172.18.200.59 dstport=25 dstintf="port2" dstintfrole="dmz" proto=6 service="SMTPS" profile="pp_Email_Antispam_WAN" action="log-only" from="users@in.jbe.best-jobs-online.com" to="dfa@sec.gov.in" sender="bounce+88.78b08ef-amt=sec.gov.in@in.jbe.good-jobs-online.com" recipient="amt@sec.gov.in" direction="outgoing" msg="A message that has spaces" subject="testmail33" size="110" attachment="no"""
      }
    }
  ]
}

Results in

{
  "docs" : [
    {
      "doc" : {
        "_index" : "m-index",
        "_type" : "_doc",
        "_id" : "kMpUTHoBr7SFhhL5-98P",
        "_source" : {
          "date" : "2021-09-27",
          "devid" : "FGT5HD3916803686",
          "msg" : "A message that has spaces", <!--- :)
          "srcip" : "209.71.72.71",
          "srcintfrole" : "wan",
          "dstport" : "25",
          "tz" : "+0530",
          "subject" : "testmail33",
          "eventtime" : "1632696165098764598",
          "sessionid" : "1505786",
          "type" : "utm",
          "policyid" : "5",
          "attachment" : "no",
          "subtype" : "emailfilter",
          "action" : "log-only",
          "devname" : "PPFW02",
          "from" : "users@in.jbe.best-jobs-online.com",
          "dstip" : "172.18.200.59",
          "dstintf" : "port2",
          "direction" : "outgoing",
          "srcintf" : "port12",
          "level" : "information",
          "profile" : "pp_Email_Antispam_WAN",
          "message" : """date=2021-09-27 time=04:12:44 devname="PPFW02" devid="FGT5HD3916803686" eventtime=1632696165098764598 tz="+0530" logid="0512044481" type="utm" subtype="emailfilter" eventtype="email" level="information" vd="root" policyid=5 sessionid=1505786 srcip=209.71.72.71 srcport=32277 srcintf="port12" srcintfrole="wan" dstip=172.18.200.59 dstport=25 dstintf="port2" dstintfrole="dmz" proto=6 service="SMTPS" profile="pp_Email_Antispam_WAN" action="log-only" from="users@in.jbe.best-jobs-online.com" to="dfa@sec.gov.in" sender="bounce+88.78b08ef-amt=sec.gov.in@in.jbe.good-jobs-online.com" recipient="amt@sec.gov.in" direction="outgoing" msg="A message that has spaces" subject="testmail33" size="110" attachment="no""",
          "vd" : "root",
          "dstintfrole" : "dmz",
          "size" : "110",
          "sender" : "bounce+88.78b08ef-amt=sec.gov.in@in.jbe.good-jobs-online.com",
          "service" : "SMTPS",
          "proto" : "6",
          "recipient" : "amt@sec.gov.in",
          "srcport" : "32277",
          "logid" : "0512044481",
          "eventtype" : "email",
          "time" : "04:12:44",
          "to" : "dfa@sec.gov.in"
        },
        "_ingest" : {
          "timestamp" : "2021-11-06T15:10:35.5051942Z"
        }
      }
    }
  ]
}

Thanks @leandrojmp for the great ideas.
All the logs are getting indexed now. kv processor configuration really worked.
Thank you all for all the great guidance and support provided for any beginner.
shini

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.