Good evening,
I have a rather extensive .conf file with a Date filter plugin being used. However it is not actually working. I am sending it to a target of @timestamp, but, I am not seeing the corresponding Original Event timestamp as the timestamp in Kibana. Information shown below:
my .conf file
input {
udp {
port => 1517
}
}
#------------------------------- [START] ----------------------------------------------------#
filter {
mutate {add_field => {"Error_Codes_To_Technique" => "/etc/logstash/conf.d/Error_Codes_To_Technique.csv"}}
# mandatory fields
mutate {
add_field => {
"[observer][vendor]" => "Mimecast"
"[observer][product]" => "Email Security Targeted Threat Protection"
}
rename => { "[message]" => "[event][original]" }
}
kv {
source => "[event][original]"
value_split => "="
field_split => "|"
}
# @timestamp
date {
match => ["datetime", "ISO8601"]
target => "@timestamp"
}
# file.hash, event.code
mutate {
rename => {
"sha256" => "[file][hash]"
"RejCode" => "[rule][id]"
"RejInfo" => "[rule][description]"
}
}
# event.category
mutate {
add_field => {"DeliveredTemp" => "%{Delivered}"}
}
if [urlCategory] or [sourceIP] {
mutate { add_field => {"[event][category]" => "ttp_url"}}
}
else if [RcptActType] {
mutate { add_field => {"[event][category]" => "jrnl"}}
}
else if [DeliveredTemp] == 'true' or [DeliveredTemp]=='false' {
mutate { add_field => {"[event][category]" => "delivery"}}
}
else if [Dir] and [Rcpt] and ![Delivered] {
mutate { add_field => {"[event][category]" => "receipt"}}
}
else if [Act] and ![Dir] {
mutate { add_field => {"[event][category]" => "process"}}
}
else if [fileName] {
mutate { add_field => {"[event][category]" => "ttp_ap"}}
}
# threat.technique
# Need to set the filePath variable
# Maps event.code into threat.technique using the Error_Codes_To_Technique.xlsx
if [rule][id] and [rule][description]{
ruby {
code => "
require 'csv'
filePath = event.get('Error_Codes_To_Technique')
eventCode = event.get('[rule][id]')
eventDescription = event.get('[rule][description]')
headers = CSV.foreach(filePath).first
table = CSV.parse(File.read(filePath), headers: true)
row = table.find {|row| row['Code'].to_s == eventCode and row['Keyword'].to_s != 'empty' and eventDescription.include? row['Keyword'].to_s}
if (row)
if row['Technique'].to_s != 'empty'
threatTechnique = row['Technique'].to_s
event.set('[threat][technique]',threatTechnique)
end
end
"
}
}
if ![threat][technique] {
if [event][category] == "ttp_ap" {
mutate { add_field => {"[threat][technique]" => "Spearphishing Attachment"}}
} else if [event][category] == "ttp_url" {
mutate { add_field => {"[threat][technique]" => "Spearphishing Link"}}
}
}
# observer.type
if [file][hash] {
mutate { add_field => {"[observer][type]" => "Anti-Malware"}}
} else if [threat][technique] == "Exfiltration" {
mutate { add_field => {"[observer][type]" => "DLP"}}
} else {
mutate { add_field => {"[observer][type]" => "Email Protection"}}
}
# network.direction
if [Dir] {
mutate { rename => { "Dir" => "[network][direction]" }}
} else if [Route] and [event][category] == "ttp_ap" {
mutate { rename => { "Route" => "[network][direction]" }}
} else if [route] {
mutate { rename => { "route" => "[network][direction]" }}
}
mutate {
rename => {
"Attempt" => "[custom][num1]"
"MsgSize" => "[custom][num2]"
"SpamScore" => "[custom][num3]"
"aCode" => "[custom][str1]"
"Subject" => "[custom][str2]"
"Rcpt" => "[destination][user][email]"
"recipient" => "[destination][user][email]"
"Recipient" => "[destination][user][email]"
"Act" => "[event][action]"
"Latency" => "[event][duration]"
"MsgId" => "[event][id]"
"Cphr" => "[rule][name]"
"RejType" => "[rule][category]"
"AttNames" => "[file][name]"
"fileName" => "[file][name]"
"AttSize" => "[file][size]"
"Size" => "[file][size]"
"TlsVer" => "[network][application]"
"acc" => "[observer][hostname]"
"url" => "[url][original]"
}
}
# destination.bytes
if [network][direction] =~ /(?i)(inbound|internal)/ {
mutate { rename => { "Snt" => "[destination][bytes]" }}
}
# event.outcome
if [Delivered] =~ /(?i)(true)/ {
mutate { add_field => {"[event][outcome]" => "SUCCESS"}}
} else if [Delivered] =~ /(?i)(false)/ {
mutate { add_field => {"[event][outcome]" => "FAIL"}}
} else if [rule][id] {
mutate { add_field => {"[event][outcome]" => "FAIL"}}
}
# destination.ip
if [event][category] == "delivery" {
mutate { rename => { "IP" => "[destination][ip]" }}
}
# source.ip
if [event][category] == "ttp_ap" {
mutate { rename => { "IP" => "[source][ip]" }}
}
# source.user.email
if [headerFrom] {
mutate { rename => {"headerFrom" => "[source][user][email]"}}
}
else {
mutate {
rename => {
"Sender" => "[source][user][email]"
"sender" => "[source][user][email]"
}
}
}
# event.response
if [event][category] == "ttp_url" or [event][action] =~ /(?i)(rej)/ {
mutate { add_field => {"[event][response]" => "Block"}}
}
else if [Hld] {
mutate { add_field => {"[event][response]" => "Quarantine"}}
} else {
mutate { add_field => {"[event][response]" => "Alert"}}
}
# event.verdict
mutate {add_field => {"IPThreadDictTemp" => "%{IPThreadDict}"}}
if [IPThreadDictTemp] =~ /(?i)(true)/ {
mutate { add_field => {"[event][verdict]" => "Suspicious"}}
} else if [reason] =~ /(?i)(malicious)/ {
mutate { add_field => {"[event][verdict]" => "Malicious"}}
}
# threat.name
grok {
match => { "Virus" => "clam\.\[(?<threatNameTemp>[^\,|^\]]*)" }
tag_on_failure => []
}
if ![threatNameTemp]{
grok {
match => { "Virus" => "sopho\.\[(?<threatNameTemp>[^\,|^\]]*)" }
tag_on_failure => []
}
}
mutate { rename => { "threatNameTemp" => "[threat][name]" }}
#event.type
if [source][user][email] =~ /(?i)(bounce)/ or [destination][user][email] =~ /(?i)(bounce)/ {
mutate {add_field => {"[event][type]" => "bounce"}}
}
#could be removed in future versions
if [event][category] =~ /(?i)(ttp_ap)/ and ![event][action]{
mutate {add_field => {"[event][action]" => "Email"}}
}
#convert to array
mutate { split => { "[source][user][email]" => ";" }}
mutate { split => { "[destination][user][email]" => ";" }}
mutate { split => { "[threat][technique]" => ";" }}
prune { # NOTE !!! more OR less fields might be needed here
whitelist_names => ["^custom","^destination$", "^event$","^file$" , "^observer$", "^source$" ,"^threat$" ,"^@timestamp$"
,"^network$","^url$","^rule$","^tags$"]
}
}
#------------------------------- [END] -----------------------------------------------------#
output {
elasticsearch {
hosts => ["https://elastichosturl:port"]
user => "elastic"
password => "changeme"
codec => json
index => "mimecast-000001"
}
}
Here is some original event data that is coming in to my Logstash server:
Original Event
echo 'datetime=2020-06-10T11:06:32-0400|aCode=caJFnXWFPr26dLiKJiyDhA|acc=CUSA105A194|IP=188.93.124.230|RejType=Virus Signature Detection|Error=Malware detected by AV Scan policy: [clam.[MC-Trojan.Gen.IE], soph.[CXmail/IsoDl-A], mave.[CXmail/IsoDl-A, MC-Trojan.Gen.IE]]|RejCode=554|Dir=Inbound|MsgId=<20200610075802.084E0EAFCB9C1F85@dagmuhendislik.com>|Subject=NEW PURCHASE ORDER 156921|headerFrom=admin@dagmuhendislik.com|Sender=admin@dagmuhendislik.com|Virus=[clam.[MC-Trojan.Gen.IE], soph.[CXmail/IsoDl-A], mave.[CXmail/IsoDl-A, MC-Trojan.Gen.IE]]|Rcpt=hdawson@redlobster.com|Act=Rej|RejInfo=[clam.[MC-Trojan.Gen.IE], soph.[CXmail/IsoDl-A], mave.[CXmail/IsoDl-A, MC-Trojan.Gen.IE]]|TlsVer=TLSv1.2|Cphr=TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384' | nc 192.168.1.172 9013
Also, here is a snippet from my Kibana instance showing the event original timestamp and the ingested timestamp which is completely wrong.
Kibana
Any assistance would be greatly appreciated.
Thanks,