Problem : Logstash filtering mutate/grok

Hi guys, iam new in elasticstack environment and i have some question below

i have a problem with logstash filter parsing, i just config my firewall labs to sending json log to logstash using tcp port 5000.

how to use filter mutate/grok to separate this "LogId", "NodeId", "Facility", "Type", "Event", "Action", "Src", "Dst", "Service", "Protocol", "Sport", "Dport", "RuleId", "NatSrc", "NatDst", "NatSport", "NatDport", "Srcif", "SrcVlan", "CompId", "NatRuleId", "SenderType", "Situation", "EventId" from field "message" ???

and to create new field "LogId", "NodeId", "Facility", "Type", "Event", "Action", "Src", "Dst", "Service", "Protocol", "Sport", "Dport", "RuleId", "NatSrc", "NatDst", "NatSport", "NatDport", "Srcif", "SrcVlan", "CompId", "NatRuleId", "SenderType", "Situation", "EventId"

some information :

Table
@timestamp Feb 7, 2022 @ 17:56:39.752
	
@version 1
	
_id IyXU034BJO1ry1ijGUSx
	
_index ngfw
	
_score - 
	
_type _doc
	
host xxx.xxx.xxx.xxx
	
message
<6>{"Timestamp":"2022-02-07 17:56:39", "LogId":"876936284", "NodeId":"xxx.xxx.xxx.xxx", "Facility":"Inspection", "Type":"Notification", "Event":"New connection", "Action":"Allow", "Src":"xxx.xxx.xxx.xxx", "Dst":"xxx.xxx.xxx.xxx", "Service":"TCP/30303", "Protocol":"6", "Sport":"39118", "Dport":"30303", "RuleId":"109.8", "NatSrc":"xxx.xxx.xxx.xxx", "NatDst":"xxx.xxx.xxx.xxx", "NatSport":"56458", "NatDport":"30303", "Srcif":"1", "SrcVlan":"3", "CompId":"CANAJKT-NGFW node 1", "NatRuleId":"260100.0 (NAT definition from engine)", "ReceptionTime":"2022-02-07 17:56:16", "SenderType":"Firewall", "Situation":"Connection_Allowed", "EventId":"6896406339135907837"}
	
port 53,824
	
tags _mutate_error

===============================================================================

JSON
{
  "_index": "ngfw",
  "_type": "_doc",
  "_id": "IyXU034BJO1ry1ijGUSx",
  "_version": 1,
  "_score": null,
  "_source": {
    "@timestamp": "2022-02-07T10:56:39.752Z",
    "host": "xxx.xxx.xxx.xxx",
    "tags": [
      "_mutate_error"
    ],
    "port": 53824,
    "message": [
      "<6>{\"Timestamp\":\"2022-02-07 17:56:39\"",
      "\"LogId\":\"876936284\"",
      "\"NodeId\":\"xxx.xxx.xxx.xxx\"",
      "\"Facility\":\"Inspection\"",
      "\"Type\":\"Notification\"",
      "\"Event\":\"New connection\"",
      "\"Action\":\"Allow\"",
      "\"Src\":\"xxx.xxx.xxx.xxx\"",
      "\"Dst\":\"xxx.xxx.xxx.xxx\"",
      "\"Service\":\"TCP/30303\"",
      "\"Protocol\":\"6\"",
      "\"Sport\":\"39118\"",
      "\"Dport\":\"30303\"",
      "\"RuleId\":\"109.8\"",
      "\"NatSrc\":\"xxx.xxx.xxx.xxx\"",
      "\"NatDst\":\"xxx.xxx.xxx.xxx\"",
      "\"NatSport\":\"56458\"",
      "\"NatDport\":\"30303\"",
      "\"Srcif\":\"1\"",
      "\"SrcVlan\":\"3\"",
      "\"CompId\":\"CANAJKT-NGFW node 1\"",
      "\"NatRuleId\":\"260100.0 (NAT definition from engine)\"",
      "\"ReceptionTime\":\"2022-02-07 17:56:16\"",
      "\"SenderType\":\"Firewall\"",
      "\"Situation\":\"Connection_Allowed\"",
      "\"EventId\":\"6896406339135907837\"}"
    ],
    "@version": "1"
  },
  "fields": {
    "@timestamp": [
      "2022-02-07T10:56:39.752Z"
    ]
  },
  "sort": [
    1644231399752
  ]
}

===============================================================================

here is my logstash configuration : ngfw.conf

input {
  tcp {
    port => 5000
  }
}

filter {
  mutate {
       split => {"message" => ","}
       remove_field => ["message", "@version"]
  }
}


output {
  elasticsearch{
    hosts => "100.100.100.100:9200"
    index => "ngfw"
  }
}

Maybe you have best advice for me, thankyou

Please format your text using markdown. At the moment it is impossible to tell whether your message field is an array of strings, or JSON, or something else.

Select the logstash configuration and click </> in the toolbar above the edit pane. Note that the preview pane will change from

input {
tcp {
port => 5000
}
}

to

input {
    tcp {
        port => 5000
    }
}

then do the same for the JSON.

sorry, i forgot about that. I have updated the post

Can you try removing the mutate+split and replacing it with

mutate { gsub => [ "message", "^<\d+>", "" ] }
json { source => "message" }

Thanks a lot Badger, it works.!!! after refresh indexing some fields just appeared

input {
  tcp {
    port => 5000
  }
}

filter {
  mutate { gsub => [ "message", "^<\d+>", "" ] }
  json { source => "message" }
}

output {
  elasticsearch{
    hosts => "xxx.xxx.xxx.xxx:9200"
    index => "ngfw"
  }
}

Opensearch is an AWS run product and differs from the original Elasticsearch and Kibana products that Elastic builds and maintains. You may need to contact them directly for further assistance.

(This is an automated response from your friendly Elastic bot. Please report this post if you have any suggestions or concerns :elasticheart: )

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.