Date filter plugin not working as expected

Good evening,

I have a rather extensive .conf file with a Date filter plugin being used. However it is not actually working. I am sending it to a target of @timestamp, but, I am not seeing the corresponding Original Event timestamp as the timestamp in Kibana. Information shown below:

my .conf file

input {
	 udp {
	 	port => 1517 
	 }
}
#------------------------------- [START] ----------------------------------------------------#


filter {

	mutate {add_field => {"Error_Codes_To_Technique" => "/etc/logstash/conf.d/Error_Codes_To_Technique.csv"}}
 	
	# mandatory fields
	mutate { 
		add_field => { 
			"[observer][vendor]" 		=> "Mimecast"
			"[observer][product]" 		=> "Email Security Targeted Threat Protection"
		}			
		rename => { "[message]" 		=> "[event][original]" }
	}


	kv {
		source => "[event][original]"
		value_split => "="
		field_split => "|"
	}

	# @timestamp
	date {
		match  => ["datetime", "ISO8601"]
		target => "@timestamp"
	}

	# file.hash, event.code
	mutate { 
		rename => {
			"sha256" 	=> "[file][hash]"
			"RejCode" 	=> "[rule][id]" 
			"RejInfo" 	=> "[rule][description]"
		}
	}
	
	# event.category
	mutate {
		add_field => {"DeliveredTemp" => "%{Delivered}"}
	}
	if [urlCategory] or [sourceIP] {
		mutate { add_field =>  {"[event][category]" => "ttp_url"}}
	} 
	else if [RcptActType] {
		mutate { add_field =>  {"[event][category]" => "jrnl"}}
	}
	else if [DeliveredTemp] == 'true' or [DeliveredTemp]=='false' {
		mutate { add_field =>  {"[event][category]" => "delivery"}}
	} 
	else if [Dir] and [Rcpt] and ![Delivered] {
		mutate { add_field =>  {"[event][category]" => "receipt"}}
	} 
	else if [Act] and ![Dir] {
		mutate { add_field =>  {"[event][category]" => "process"}}
	} 
	else if [fileName] {
		mutate { add_field =>  {"[event][category]" => "ttp_ap"}}
	}

	# threat.technique
	# Need to set the filePath variable
	# Maps event.code into threat.technique using the Error_Codes_To_Technique.xlsx

	if [rule][id] and [rule][description]{
		ruby {
			code => "
				require 'csv'
				filePath = event.get('Error_Codes_To_Technique')
				eventCode = event.get('[rule][id]')
				eventDescription = event.get('[rule][description]')
				headers = CSV.foreach(filePath).first
				table = CSV.parse(File.read(filePath), headers: true)
				row = table.find {|row| row['Code'].to_s == eventCode and row['Keyword'].to_s != 'empty' and eventDescription.include? row['Keyword'].to_s}
				if (row)
					if row['Technique'].to_s != 'empty'
						threatTechnique = row['Technique'].to_s
						event.set('[threat][technique]',threatTechnique) 
					end
				end
			"
		}
	}
	if ![threat][technique] {
		if [event][category] == "ttp_ap" {
			mutate { add_field =>  {"[threat][technique]" => "Spearphishing Attachment"}}
		} else if [event][category] == "ttp_url" {
			mutate { add_field =>  {"[threat][technique]" => "Spearphishing Link"}}
		}
	}
	
	# observer.type
	if [file][hash] {
		mutate { add_field =>  {"[observer][type]" => "Anti-Malware"}}
	} else if [threat][technique] == "Exfiltration" {
		mutate { add_field =>  {"[observer][type]" => "DLP"}}
	} else {
		mutate { add_field =>  {"[observer][type]" => "Email Protection"}}
	}

	# network.direction
	if [Dir] {
		mutate { rename =>  { "Dir" => "[network][direction]" }}
	} else if [Route] and [event][category] == "ttp_ap" {
		mutate { rename =>  { "Route" => "[network][direction]" }}
	} else if [route] {
		mutate { rename =>  { "route" => "[network][direction]" }}
	}

	mutate { 
		rename =>  { 
			"Attempt" 	=> "[custom][num1]"
			"MsgSize" 	=> "[custom][num2]"
			"SpamScore" => "[custom][num3]"
			"aCode" 	=> "[custom][str1]"
			"Subject" 	=> "[custom][str2]"
			"Rcpt" 		=> "[destination][user][email]"
			"recipient" => "[destination][user][email]"
			"Recipient" => "[destination][user][email]"
			"Act" 		=> "[event][action]"
			"Latency" 	=> "[event][duration]"
			"MsgId" 	=> "[event][id]"
			"Cphr" 		=> "[rule][name]"
			"RejType" 	=> "[rule][category]"
			"AttNames" 	=> "[file][name]"
			"fileName" 	=> "[file][name]"
			"AttSize" 	=> "[file][size]"
			"Size" 		=> "[file][size]"
			"TlsVer" 	=> "[network][application]"
			"acc" 		=> "[observer][hostname]"
			"url" 		=> "[url][original]"	
		} 
	}

	# destination.bytes
	if [network][direction] =~ /(?i)(inbound|internal)/ {
		mutate { rename =>  { "Snt" => "[destination][bytes]" }}
	}

	# event.outcome
	if [Delivered] =~ /(?i)(true)/ {
		mutate { add_field =>  {"[event][outcome]" => "SUCCESS"}}
	} else if [Delivered] =~ /(?i)(false)/ {
		mutate { add_field =>  {"[event][outcome]" => "FAIL"}}
	} else if [rule][id] {
		mutate { add_field =>  {"[event][outcome]" => "FAIL"}}
	}

	# destination.ip
	if [event][category] == "delivery" {
		mutate { rename =>  { "IP" => "[destination][ip]" }}
	}

	# source.ip
	if [event][category] == "ttp_ap" {
		mutate { rename =>  { "IP" => "[source][ip]" }}
	}

	# source.user.email
	if [headerFrom] {
		mutate { rename =>  {"headerFrom" => "[source][user][email]"}}
	} 
	else {
		mutate { 
			rename =>  { 
				"Sender" => "[source][user][email]"
				"sender" => "[source][user][email]"
			}
		}
	}

	# event.response
	if [event][category] == "ttp_url" or [event][action] =~ /(?i)(rej)/ {
		mutate { add_field =>  {"[event][response]" => "Block"}}
	}
	else if [Hld] {
		mutate { add_field =>  {"[event][response]" => "Quarantine"}}
	} else {
		mutate { add_field =>  {"[event][response]" => "Alert"}}
	}

	# event.verdict 
	mutate {add_field => {"IPThreadDictTemp" => "%{IPThreadDict}"}}
	if [IPThreadDictTemp] =~ /(?i)(true)/ {
		mutate { add_field =>  {"[event][verdict]" => "Suspicious"}}
	} else if [reason] =~ /(?i)(malicious)/ {
		mutate { add_field =>  {"[event][verdict]" => "Malicious"}}
	}

	# threat.name
	grok { 
		match => { "Virus" => "clam\.\[(?<threatNameTemp>[^\,|^\]]*)" } 
		tag_on_failure => []
	}
	if ![threatNameTemp]{
		grok { 
			match => { "Virus" => "sopho\.\[(?<threatNameTemp>[^\,|^\]]*)" } 
			tag_on_failure => []
		}
	}

	mutate { rename =>  { "threatNameTemp" => "[threat][name]" }}

	#event.type
	if [source][user][email] =~ /(?i)(bounce)/ or [destination][user][email] =~ /(?i)(bounce)/ {
		mutate {add_field => {"[event][type]" => "bounce"}}
	}

	#could be removed in future versions
	if [event][category] =~ /(?i)(ttp_ap)/ and ![event][action]{
		mutate {add_field => {"[event][action]" => "Email"}}
	}

	#convert to array
	mutate { split => { "[source][user][email]" => ";" }}
	mutate { split => { "[destination][user][email]" => ";" }}
	mutate { split => { "[threat][technique]" => ";" }}

	prune { # NOTE !!! more OR less fields might be needed here 
		whitelist_names => ["^custom","^destination$", "^event$","^file$" , "^observer$", "^source$" ,"^threat$" ,"^@timestamp$"
		,"^network$","^url$","^rule$","^tags$"]
    }
}

#------------------------------- [END] -----------------------------------------------------#
output {

	elasticsearch {
		hosts => ["https://elastichosturl:port"]
		user => "elastic"
		password => "changeme"
		codec => json
		index => "mimecast-000001"
        }
}

Here is some original event data that is coming in to my Logstash server:

Original Event

echo 'datetime=2020-06-10T11:06:32-0400|aCode=caJFnXWFPr26dLiKJiyDhA|acc=CUSA105A194|IP=188.93.124.230|RejType=Virus Signature Detection|Error=Malware detected by AV Scan policy: [clam.[MC-Trojan.Gen.IE], soph.[CXmail/IsoDl-A], mave.[CXmail/IsoDl-A, MC-Trojan.Gen.IE]]|RejCode=554|Dir=Inbound|MsgId=<20200610075802.084E0EAFCB9C1F85@dagmuhendislik.com>|Subject=NEW PURCHASE ORDER 156921|headerFrom=admin@dagmuhendislik.com|Sender=admin@dagmuhendislik.com|Virus=[clam.[MC-Trojan.Gen.IE], soph.[CXmail/IsoDl-A], mave.[CXmail/IsoDl-A, MC-Trojan.Gen.IE]]|Rcpt=hdawson@redlobster.com|Act=Rej|RejInfo=[clam.[MC-Trojan.Gen.IE], soph.[CXmail/IsoDl-A], mave.[CXmail/IsoDl-A, MC-Trojan.Gen.IE]]|TlsVer=TLSv1.2|Cphr=TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384' | nc 192.168.1.172 9013

Also, here is a snippet from my Kibana instance showing the event original timestamp and the ingested timestamp which is completely wrong.

Kibana

Any assistance would be greatly appreciated.

Thanks,

Note the leading <14> on the [event][original] field. Your kv filter will be creating a field called [<14>datetime]. You do not get a _dateparsefailure because the date filter is a no-op if the source field does not exist.

I would remove it using mutate

mutate { gsub => [ "[event][original]", "^<\d+>", "" ] }

Thank you for this insight! Not to sound like a noob to much, but should I put the code before or after my kv?

I was thinking after, but figured I would ask just in case.

Thanks,

No, that would have to be before the kv.

@Badger, I made the suggested change to the .conf file as you can see below.

filter {

   mutate {add_field => {"Error_Codes_To_Technique" => "/etc/logstash/conf.d/Error_Codes_To_Technique.csv"}}

   

   # mandatory fields

   mutate { 

       add_field => { 

           "[observer][vendor]"        => "Mimecast"

           "[observer][product]"       => "Email Security Targeted Threat Protection"

       }           

       rename => { "[message]"         => "[event][original]" }

   }

   kv {

       source => "[event][original]"

       value_split => "="

       field_split => "|"

   }

   

   mutate { 

       gsub => [ "[event][original]", "^<\d+>", "" ] 

   }

   # @timestamp

   date {

       match  => ["datetime", "ISO8601"]

       target => "@timestamp"

   }

However, after a restart to logstash service.

I am still seeing the same issue with the @timestamp not being the time in the event original. As shown in a new screenshot below:

@Badger,

I noticed your comment to move the snippet you gave to before the "kv" filter after I posted. I have since moved it before and I am still getting the same issue as above.

Any other suggestions would be great.

Thanks,

If you look at the event does it have a [datetime] field?

@Badger,

Unfortunately a [datetime] field does not show up in kibana for any event.

@Badger,

Well, I went in a refreshed my index patterns after making those changes and sure enough I am seeing any new data coming in with the proper @timestamp field being populated.

Thank you so much for your assistance.

I will close this issue out now.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.