Hi guys,
I have an issue with one of the Logstash pipelines in our stack. The incoming CEF logs from Cyberark sometimes get split into two events for no apparent reason. I ran tcpdump on the port to which CYA is supposed to send, and got the following (redacted) string:
<5>1 2021-06-14T06:17:04Z CYA-VAULT-SERVER CEF:0|Cyber-Ark|Vault|11.4.0000|308|Use Password|5|act=Use Password suser=
Now this event got split at the input in the middle of a value:
@timestamp: Jun 14, 2021 @ 08:17:09.931
@version: 1
applicationProtocol:
cef.extensions.deviceAddress: 11.206.211.134
cef.version: 0
cefVersion: 0
destination.address: dummy.corp
destination.user.name: CYA-P_SRVAdm1
Multi fields
destination.user.name.text
CYA-P_SRVAdm1
destinationHostName: dummy.corp
destinationUserName: CYA-P_SRVAdm1
deviceAction: Use Password
deviceAddress: 11.206.211.134
deviceCustomString1:
deviceCustomString1Label: "Affected User Name"
deviceCustomString2: CYA-P_SRVAdm1
deviceCustomString2Label: "Safe Name"
deviceCustomString3: Operating System
deviceCustomString3Label: "Device Type"
deviceCustomString4:
deviceCustomString4Label: "Database"
deviceCustomString5Label: "Ot
deviceEventClassId: 308
deviceProduct: Vault
deviceVendor: Cyber-Ark
deviceVersion: 11.4.0000
event.action: Use Password
event.code: 308
event.severity: 5
externalId:
file.name: Root\Operating System-CON_Win-domain-Adm1
fileName: Root\Operating System-CON_Win-domain-Adm1
host.ip: 11.27.22.111
host.port: 56305
name: Use Password
observer.product: Vault
observer.vendor: Cyber-Ark
observer.version: 11.4.0000
port: 56,305
reason:
severity: 5
source.address: 10.215.244.83
source.10.215.244.83
source.user.name:
@timestamp: Jun 14, 2021 @ 08:17:09.934
@version: 1
host.ip: 10.215.244.83
host.port: 56305
log.original: her info" cs5=11.206.211.134 cn1Label="Request Id" cn1= cn2Label="Ticket Id" cn2=(Action: Connect) msg=(Action: Connect)
message: her info" cs5=11.206.211.134 cn1Label="Request Id" cn1= cn2Label="Ticket Id" cn2=(Action: Connect) msg=(Action: Connect)
port: 56,305
suricata.eve.timestamp: Jun 14, 2021 @ 08:17:09.934
tags: _cefparsefailure
My pipeline configuration is
input {
tcp {
id => "cyberark-tcp-input"
port => 5146
codec => "cef"
}
}
filter {
if [host] { mutate {
add_field => { "[host_temp]" => "%{[host]}" }
remove_field => ["host"]
} }
if [syslog] { mutate {
add_field => { "[syslog_temp]" => "%{[syslog]}" }
remove_field => ["syslog"]
} }
if [applicationProtocol] and ![network][protocol] { mutate { add_field => { "[network][protocol]" => "%{[applicationProtocol]}" } } }
if [cefVersion] and ![cef][version] { mutate { add_field => { "[cef][version]" => "%{[cefVersion]}" } } }
if [destinationHostName] and ![destination][address] { mutate { add_field => { "[destination][address]" => "%{[destinationHostName]}" } } }
if [destinationUserName] and ![destination][user][name] { mutate { add_field => { "[destination][user][name]" => "%{[destinationUserName]}" } } }
if [deviceAction] and ![event][action] { mutate { add_field => { "[event][action]" => "%{[deviceAction]}" } } }
if [deviceAddress] and ![cef][extensions][deviceAddress] { mutate { add_field => { "[cef][extensions][deviceAddress]" => "%{[deviceAddress]}" } } }
if [deviceEventClassId] and ![event][code] { mutate { add_field => { "[event][code]" => "%{[deviceEventClassId]}" } } }
if [deviceProduct] and ![observer][product] { mutate { add_field => { "[observer][product]" => "%{[deviceProduct]}" } } }
if [deviceVendor] and ![observer][vendor] { mutate { add_field => { "[observer][vendor]" => "%{[deviceVendor]}" } } }
if [deviceVersion] and ![observer][version] { mutate { add_field => { "[observer][version]" => "%{[deviceVersion]}" } } }
if [externalId] and ![cef][extensions][externalId] { mutate { add_field => { "[cef][extensions][externalId]" => "%{[externalId]}" } } }
if [fileName] and ![file][name] { mutate { add_field => { "[file][name]" => "%{[fileName]}" } } }
if [host_temp] and ![host][ip] { mutate { add_field => { "[host][ip]" => "%{[host_temp]}" } } }
if [message] and ![log][original] { mutate { add_field => { "[log][original]" => "%{[message]}" } } }
if [port] and ![host][port] { mutate { add_field => { "[host][port]" => "%{[port]}" } } }
if [reason] and ![event][reason] { mutate { add_field => { "[event][reason]" => "%{[reason]}" } } }
if [severity] and ![event][severity] { mutate { add_field => { "[event][severity]" => "%{[severity]}" } } }
if [sourceHostName] and ![source][address] { mutate { add_field => { "[source][address]" => "%{[sourceHostName]}" } } }
if [source][address] { mutate { add_field => { "[source][ip]" => "%{[source][address]}" } } }
if [sourceUserName] and ![source][user][name] { mutate { add_field => { "[source][user][name]" => "%{[sourceUserName]}" } } }
if [syslog_temp] and ![syslog_message] { mutate { add_field => { "[syslog_message]" => "%{[syslog_temp]}" } } }
mutate { remove_field => ["host_temp", "syslog_temp"] }
}
filter {
if [deviceProduct] == "PTA" {
if [source][ip] == "None" { mutate { remove_field => ["[source][ip]"]} }
if [source][address] == "None" { mutate { remove_field => ["[source][address]"]} }
if [sourceHostName] == "None" { mutate { remove_field => ["[sourceHostName]"]} }
}
}
output {
file {
path => "/usr/share/logstash/cyberark-events.txt"
}
}
I'm not sure where to go from here, as it seems the tcp input splits the incoming event before it even reaches the filters.