Remove a specific field from the log and overwrite the message field

trying to parse the following log using the KV filter ; but there are some fields "message", "type" and "index" in the log message, which is overwriting the original "message", "type", "index" fields dropping the entire event.

What I need help with is to rename/substitute these three fields to something like srx.msg, srx.event.type, srx.index overwrite the message field . After this, I can simply apply the KV filter and any other required parsers.

Mar 2 15:12:59 host.name.abc 1 2020-03-02T23:13:03.193Z host.name.abc RT_IDP - IDP_ATTACK_LOG_EVENT [junos@2636.1.1.1.2.28 epoch-time="1583190783" message-type="SIG" source-address=“1.x.x.x.” source-port=“12345” destination-address=“1.x.x.x.” destination-port=“123” protocol-name="TCP" service-name="SERVICE_IDP" application-name="HTTP" rule-name="3" rulebase-name="IPS" policy-name="Recommended" export-id="20175" repeat-count="0" action="DROP" threat-severity="HIGH" attack-name="HTTP:MISC:GENERIC-DIR-TRAVERSAL" nat-source-address="0.0.0.0" nat-source-port="13312" nat-destination-address=“3.x.x.x.” nat-destination-port="9757" elapsed-time="0" inbound-bytes="0" outbound-bytes="0" inbound-packets="0" outbound-packets="0" source-zone-name="UNTRUST" source-interface-name="reth1.24" destination-zone-name="DMZ" destination-interface-name="reth2.21" packet-log-id="0" alert="no" username="unknown-user" roles="N/A" index="cnm" type="idp" message="-"]

I've tried the following, not sure , if it is the correct way of doing it, but doesn't work...

   if [name] == "jp_logs"
        {
        grok {
            match => {"message" => ["(?m)%{SYSLOGTIMESTAMP:syslog.timestamp} %{SYSLOGHOST:syslog.hostname} %{POSINT:syslog.pid} %{TIMESTAMP_ISO8601:system.syslog.timestamp} %{SYSLOGHOST:hostname} (?<srx.tag>.+)? - (?<srx.type>[^\s]+)? %{GREEDYDATA:syslog.message}" ] } }


		if "message=" in [syslog.message] or "index=" in [syslog.message] or "type=" in [syslog.message] or "name=" in [syslog.message] {
			mutate {
				gsub => [ "syslog.message","message=","srx.msg=",
					  "syslog.message","type=","ipsec.type=",
					  "syslog.message","name=","ipsec.name=",
					  "syslog.message","index=","ipsec.index="
					 ]
				}
### Update the syslog.message field after rename the above fields

			mutate { update => { "syslog.message" => "syslog.message" }
			}
			}

			
 	               	kv {
                        source => "syslog.message"
                        value_split => "="
                        allow_duplicate_values => false
                        transform_key => "lowercase"
                        trim_value => "\[\]"
			remove_field => [ "command" ]
                        	}

Thanks

Hi there,

in what terms "it doesn't work"? I've replicated your same pipeline (removing the update filter which is useless since the above gsub automatically updates that field) with your same input and it seems to work perfectly fine to me.

Using this pipeline:

input {
  stdin{}
}

filter {
  grok {
    match => {
      "message" => 
      [ 
        "(?m)%{SYSLOGTIMESTAMP:syslog.timestamp} %{SYSLOGHOST:syslog.hostname} %{POSINT:syslog.pid} %{TIMESTAMP_ISO8601:system.syslog.timestamp} %{SYSLOGHOST:hostname} (?<srx.tag>.+)? - (?<srx.type>[^\s]+)? %{GREEDYDATA:syslog.message}" 
      ] 
    } 
  }

  if "message=" in [syslog.message] or "index=" in [syslog.message] or "type=" in [syslog.message] or "name=" in [syslog.message] {
		mutate {
		  gsub => [ 
        "syslog.message","message=","srx.msg=",
				"syslog.message","type=","ipsec.type=",
				"syslog.message","name=","ipsec.name=",
				"syslog.message","index=","ipsec.index="
			]
		}
	}

			
 	kv {
    source => "syslog.message"
    value_split => "="
    allow_duplicate_values => false
    transform_key => "lowercase"
    trim_value => "\[\]"
		remove_field => [ "command" ]
  }
}

output {
  stdout{}
}

And the input document you provided:

Mar 2 15:12:59 host.name.abc 1 2020-03-02T23:13:03.193Z host.name.abc RT_IDP - IDP_ATTACK_LOG_EVENT [junos@2636.1.1.1.2.28 epoch-time="1583190783" message-type="SIG" source-address=“1.x.x.x.” source-port=“12345” destination-address=“1.x.x.x.” destination-port=“123” protocol-name="TCP" service-name="SERVICE_IDP" application-name="HTTP" rule-name="3" rulebase-name="IPS" policy-name="Recommended" export-id="20175" repeat-count="0" action="DROP" threat-severity="HIGH" attack-name="HTTP:MISC:GENERIC-DIR-TRAVERSAL" nat-source-address="0.0.0.0" nat-source-port="13312" nat-destination-address=“3.x.x.x.” nat-destination-port="9757" elapsed-time="0" inbound-bytes="0" outbound-bytes="0" inbound-packets="0" outbound-packets="0" source-zone-name="UNTRUST" source-interface-name="reth1.24" destination-zone-name="DMZ" destination-interface-name="reth2.21" packet-log-id="0" alert="no" username="unknown-user" roles="N/A" index="cnm" type="idp" message="-"]

This is the standard output:

{
                     "inbound-packets" => "0",
                          "syslog.pid" => "1",
                          "@timestamp" => 2020-03-03T09:23:41.224Z,
                  "message-ipsec.type" => "SIG",
                      "syslog.message" => "[junos@2636.1.1.1.2.28 epoch-time=\"1583190783\" message-ipsec.type=\"SIG\" source-address=“1.x.x.x.” source-port=“12345” destination-address=“1.x.x.x.” destination-port=“123” protocol-ipsec.name=\"TCP\" service-ipsec.name=\"SERVICE_IDP\" application-ipsec.name=\"HTTP\" rule-ipsec.name=\"3\" rulebase-ipsec.name=\"IPS\" policy-ipsec.name=\"Recommended\" export-id=\"20175\" repeat-count=\"0\" action=\"DROP\" threat-severity=\"HIGH\" attack-ipsec.name=\"HTTP:MISC:GENERIC-DIR-TRAVERSAL\" nat-source-address=\"0.0.0.0\" nat-source-port=\"13312\" nat-destination-address=“3.x.x.x.” nat-destination-port=\"9757\" elapsed-time=\"0\" inbound-bytes=\"0\" outbound-bytes=\"0\" inbound-packets=\"0\" outbound-packets=\"0\" source-zone-ipsec.name=\"UNTRUST\" source-interface-ipsec.name=\"reth1.24\" destination-zone-ipsec.name=\"DMZ\" destination-interface-ipsec.name=\"reth2.21\" packet-log-id=\"0\" alert=\"no\" useripsec.name=\"unknown-user\" roles=\"N/A\" ipsec.index=\"cnm\" ipsec.type=\"idp\" srx.msg=\"-\"]",
                        "repeat-count" => "0",
                 "protocol-ipsec.name" => "TCP",
                       "inbound-bytes" => "0",
                 "rulebase-ipsec.name" => "IPS",
                             "srx.msg" => "\"-\"",
              "source-zone-ipsec.name" => "UNTRUST",
                   "policy-ipsec.name" => "Recommended",
                              "action" => "DROP",
                         "ipsec.index" => "cnm",
                     "threat-severity" => "HIGH",
                            "@version" => "1",
                                "host" => "fabio",
             "nat-destination-address" => "“3.x.x.x.”",
                    "syslog.timestamp" => "Mar 2 15:12:59",
                 "destination-address" => "“1.x.x.x.”",
                         "source-port" => "“12345”",
         "source-interface-ipsec.name" => "reth1.24",
                  "nat-source-address" => "0.0.0.0",
                "nat-destination-port" => "9757",
                             "message" => "Mar 2 15:12:59 host.name.abc 1 2020-03-02T23:13:03.193Z host.name.abc RT_IDP - IDP_ATTACK_LOG_EVENT [junos@2636.1.1.1.2.28 epoch-time=\"1583190783\" message-type=\"SIG\" source-address=“1.x.x.x.” source-port=“12345” destination-address=“1.x.x.x.” destination-port=“123” protocol-name=\"TCP\" service-name=\"SERVICE_IDP\" application-name=\"HTTP\" rule-name=\"3\" rulebase-name=\"IPS\" policy-name=\"Recommended\" export-id=\"20175\" repeat-count=\"0\" action=\"DROP\" threat-severity=\"HIGH\" attack-name=\"HTTP:MISC:GENERIC-DIR-TRAVERSAL\" nat-source-address=\"0.0.0.0\" nat-source-port=\"13312\" nat-destination-address=“3.x.x.x.” nat-destination-port=\"9757\" elapsed-time=\"0\" inbound-bytes=\"0\" outbound-bytes=\"0\" inbound-packets=\"0\" outbound-packets=\"0\" source-zone-name=\"UNTRUST\" source-interface-name=\"reth1.24\" destination-zone-name=\"DMZ\" destination-interface-name=\"reth2.21\" packet-log-id=\"0\" alert=\"no\" username=\"unknown-user\" roles=\"N/A\" index=\"cnm\" type=\"idp\" message=\"-\"]",
                      "outbound-bytes" => "0",
                     "syslog.hostname" => "host.name.abc",
              "application-ipsec.name" => "HTTP",
                               "alert" => "no",
                      "source-address" => "“1.x.x.x.”",
         "destination-zone-ipsec.name" => "DMZ",
                     "rule-ipsec.name" => "3",
                            "srx.type" => "IDP_ATTACK_LOG_EVENT",
                       "packet-log-id" => "0",
                          "epoch-time" => "1583190783",
                             "srx.tag" => "RT_IDP",
                    "destination-port" => "“123”",
                     "nat-source-port" => "13312",
                            "hostname" => "host.name.abc",
                           "export-id" => "20175",
                        "elapsed-time" => "0",
             "system.syslog.timestamp" => "2020-03-02T23:13:03.193Z",
                               "roles" => "N/A",
                      "useripsec.name" => "unknown-user",
                          "ipsec.type" => "idp",
                   "attack-ipsec.name" => "HTTP:MISC:GENERIC-DIR-TRAVERSAL",
                    "outbound-packets" => "0",
    "destination-interface-ipsec.name" => "reth2.21",
                  "service-ipsec.name" => "SERVICE_IDP"
}

What did you expect to be different?

Also, I sincerely recommend you not to use the dot notation in your field names. It may cause problems in future scripts/pipelines because it always creates a bit of confusion if the foo.bar field is a root field with a dot in the name or a bar field nested in a foo field.
For example, if you try to apply that grok in the Kibana Grok Debugger, you'll see some nested fields are spit out. Hence, if you wanna create nested fields in Logstash use the [foo][bar] syntax. If you want to create root field I suggest you should use the snake case foo_bar syntax.

1 Like

Thanks @Fabio-sama

That worked very well. the update wasn't required. I will also take care of the fields naming convention.

You're welcome. It was your pipeline after all :slight_smile:

@Fabio-sama

There's one issue, it is actually substituting for all other fields where it finds the pattern "name=". I am only trying to substitute if the field is "name=". But with what we've currently, it changed couple of more fields .. source-zone-name to source-zone-ipsec.name, policy-name to policy-ipsec.name..

I am trying to just substitute the actual fields (if exists in the event) to avoid any field conflicts.
Thanks

Hi there,

just add a space before the fields to be replaced. Try with something like:

input {
  stdin{}
}

filter {
  grok {
    match => {
      "message" => 
      [ 
        "(?m)%{SYSLOGTIMESTAMP:syslog.timestamp} %{SYSLOGHOST:syslog.hostname} %{POSINT:syslog.pid} %{TIMESTAMP_ISO8601:system.syslog.timestamp} %{SYSLOGHOST:hostname} (?<srx.tag>.+)? - (?<srx.type>[^\s]+)? %{GREEDYDATA:syslog.message}" 
      ] 
    } 
  }

  if "message=" in [syslog.message] or "index=" in [syslog.message] or "type=" in [syslog.message] or "name=" in [syslog.message] {
    mutate {
      gsub => [ 
        "syslog.message"," message="," srx.msg=",
        "syslog.message"," type="," ipsec.type=",
        "syslog.message"," name="," ipsec.name=",
        "syslog.message"," index="," ipsec.index="
      ]
    }
  }

  kv {
    source => "syslog.message"
    value_split => "="
    allow_duplicate_values => false
    transform_key => "lowercase"
    trim_value => "\[\]"
    remove_field => [ "command" ]
  }
}

output {
  stdout{}
}

Worked Perfectly .!! Thank you again @Fabio-sama :grinning:

No problem :slight_smile:

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.