Message field within message

Hi All,

Raw SYSLOG event:

Jul 21 01:19:57.58 172.20.20.100 date=2016-07-20 time=20:12:25 timezone="UTC" device_name="CRiV-1C" device_id=1234-567 log_id=062009517502 log_type="Event" log_component="GUI" log_subtype="Admin" status="Successful" priority=Notice user_name="admin" src_ip=10.0.0.26 message="Appliance Access Settings were changed by 'admin' from '10.0.0.26' using 'GUI'"

Problem Statement:

As per my knowledge Logstash encapsulate entire syslog event into a payload or record called "message". This make sense to me but in my case this is conflicting with "message" key which already exist in the event.

My requirement:

Rename "message" key (Not the entire payload) to "dmsg".

I tried grok filter to rename this, that works perfectly fine. But it creates mess if message key in some syslog event doesn't exist and it simply copy entire syslog event into dmsg. I would like to copy message key into dmsg only when that exist in the syslog event.

I would appreciate if someone could help me.

Regards,

Ajay

So do you want

message: Jul 21 01:19:57.58 172.20.20.100 ...
dmsg: Appliance Access Settings ...

or

message: Appliance Access Settings ...
dmsg: Jul 21 01:19:57.58 172.20.20.100 ...

Yes this is what I would like to achieve.

And you use a grok filter to parse the event and extract the log_type, log_component, ..., message fields? If so, why not just extract the message to dmsg instead of message. If you show your current configuration it'll be easier to help.

I am not sure how to extract a field named as "message" from grok, I would appreciate if you could guide me or redirect me to some article or doc.

Extracting means old message key won't be part of event and a new key dmsg would be in the event. Is this a correct understanding?

I repeat: If you show your current configuration it'll be easier to help.

Here is my current configuration for testing purpose:

filter {
if ("cloned" not in [tags]) {
clone {
clones => [ "cyberoam.raw" ]
add_tag => ["cloned"]
}
}

    if [type] == "cyberoam.input" {

            kv { }

            grok {
                    patterns_dir => ["/etc/logstash/patterns"]
                    match => ["message", "%{CRv11}"]
            }

            syslog_pri { }

            mutate {
                    rename => [ "src_ip" , "srcip" ]
                    rename => [ "dst_ip" , "dstip" ]
                    rename => [ "message", "dmsg"]
                    rename => [ "device_name", "devname"]
                    rename => [ "device_id", "devid"]
			}
	}

}

output {

if [type] == "cyberoam.input" {
stdout {
codec => rubydebug { metadata => true }
}

    stdout { }

}

if [type] == "cyberoam.raw" { stdout { } }

}

In above, I am just renaming the field message to dmsg as it was creating problem in ES index mapping.

And what's the definition of the CRv11 grok pattern?

CRv11 ^<%{NONNEGINT:syslog_priority}>date=%{CRDATE:date} time=%{TIME:time} timezone=%{TZ:timezone} device_name=%{WORD:device_name} device_id=%{WORD:device_id} log_id=%{NUMBER:log_id} log_type=%{WORD:log_type} log_component=%{WORD:log_component} log_subtype=%{WORD:log_subtype} status=%{WORD:status} priority=%{WORD:priority} fw_rule_id=%{NUMBER:fw_rule_id} user_name=%{WORD:user_name} user_gp=%{WORD:user_gp} iap=%{NUMBER:iap} category=%{WORD:category} category_type=%{WORD:category_type} url=%{WORD:url} contenttype=%{WORD:contenttype} override_token=%{WORD:override_token} httpresponsecode=%{WORD:httpresponsecode} sourceip=%{IP:src_ip} dst_ip=%{IP:dst_ip} protocol=%{NUMBER:protocol} src_port=%{NUMBER:src_port} dst_port=%{NUMBER:dst_port} sent_bytes=%{NUMBER:sent_bytes} recv_bytes=%{NUMBER:recv_bytes} domain=%{WORD:domain}

Above pattern doesn't contain "message" field, as I have not yet completed writing all fields in the pattern.

Okay, but when you do complete the pattern you could say

message=%{SOMEGROKPATTERN:dmsg}

instead of

message=%{SOMEGROKPATTERN:message}

to capture directly to the dmsg field.

But really, you should use a kv filter for parsing these key/value pairs instead of writing a really long grok pattern that'll break for even small changes in the log data.

I am not sure how to write KV filters. If you could just give me a hint it would be really great.

Use a grok filter to extract syslog_priority and put the rest of the string into a field that you feed a kv filter. There are tons of kv filter examples available online (e.g. in this forum) so I won't write an example for that.

Ok, let me learn and try that. Thank you for your prompt responses.

I am facing problem with KV:

Below is sample event:

2016-08-15T12:43:04.478Z 103.240.35.216 <190>date=2016-08-15 time=18:13:16 timezone="IST" device_name="CR25iNG" device_id=C2222-123 log_id=010302602002 log_type="Firewall" log_component="Appliance Access" log_subtype="Denied" status="Deny" priority=Information duration=0 fw_rule_id=0 user_name="" user_gp="" iap=0 ips_policy_id=0 appfilter_policy_id=0 application="" application_risk=0 application_technology="" application_category="" in_interface="PortA" out_interface="" src_mac=44:d9:e7:ba:5b:6c src_ip=172.16.16.19 src_country_code= dst_ip=255.255.255.255 dst_country_code= protocol="UDP" src_port=45541 dst_port=10001 sent_pkts=0 recv_pkts=0 sent_bytes=0 recv_bytes=0 tran_src_ip= tran_src_port=0 tran_dst_ip= tran_dst_port=0 srczonetype="" srczone="" dstzonetype="" dstzone="" dir_disp="" connid="" vconnid=""

Below is the KV filter output:

"@version" => "1",
"@timestamp" => "2016-08-16T13:48:30.602Z",
"type" => "cyberoam.input",
"host" => "ip-172-31-6-249",
"time" => "18:13:16",
"timezone" => "IST",
"status" => "Deny",
"priority" => "Information",
"duration" => "0",
"iap" => "0",
"application" => "",
"application_risk" => "0",
"application_technology" => "",
"application_category" => "",
"dst_country_code" => "protocol=UDP",
"recv_pkts" => "0",
"tran_src_ip" => "tran_src_port=0",
"tran_dst_ip" => "tran_dst_port=0",
"srczonetype" => "",
"srczone" => "",
"dstzonetype" => "",
"dstzone" => "",
"dir_disp" => "",
"syslog_severity_code" => 5,
"syslog_facility_code" => 1,
"syslog_facility" => "user-level",
"syslog_severity" => "notice",
"date" => "2016-08-15",

Problem:

"tran_src_ip" => "tran_src_port=0",
"tran_dst_ip" => "tran_dst_port=0",

Above is due to empty keys "tran_src_ip" and "tran_dst_ip".

Could any one please help me to get this fixed?

Perhaps you can use a mutate filter and its gsub option to replace occurrences of "\w+= " with an empty string.

It looks like tran_scr_ip and tran_dst_ip have no values defined, which causes the odd parsing. You should be able to add a default value, e.g. "", by substituting =\w with ="".

Hi Both,

I tried various gsub options but somehow it looks like that gsub is not evaluating event.

Below are configs:

          mutate {
                    gsub => [ 'message', '=\w', '=""' ]
            }


          mutate {
                    gsub => [ 'message', '=\s', '=""' ]
            }

          mutate {
                    gsub => [ 'message', '\w+=', '""' ]
            }

Please guide.

Why not try the suggestion I made?

This seems to work for me:

mutate {
  gsub => [ 'message', '= ', '="" ' ]
}