Logstash TCP input splits events unintentionally

Hi guys,

I have an issue with one of the Logstash pipelines in our stack. The incoming CEF logs from Cyberark sometimes get split into two events for no apparent reason. I ran tcpdump on the port to which CYA is supposed to send, and got the following (redacted) string:

<5>1 2021-06-14T06:17:04Z CYA-VAULT-SERVER CEF:0|Cyber-Ark|Vault|11.4.0000|308|Use Password|5|act=Use Password suser= fname=Root\Operating System-CON_Win-domain-Adm1 dvc=11.206.211.134 shost=10.215.244.83 dhost=dummy.corp duser=CYA-P_SRVAdm1 externalId= app= reason= cs1Label="Affected User Name" cs1= cs2Label="Safe Name" cs2=P-localServer-B-F cs3Label="Device Type" cs3=Operating System cs4Label="Database" cs4= cs5Label="Other info" cs5=11.206.211.134 cn1Label="Request Id" cn1= cn2Label="Ticket Id" cn2=(Action: Connect) msg=(Action: Connect)

Now this event got split at the input in the middle of a value:

@timestamp: Jun 14, 2021 @ 08:17:09.931
@version: 1
applicationProtocol:
cef.extensions.deviceAddress: 11.206.211.134
cef.version: 0
cefVersion: 0
destination.address: dummy.corp
destination.user.name: CYA-P_SRVAdm1
Multi fields
destination.user.name.text
CYA-P_SRVAdm1
destinationHostName: dummy.corp
destinationUserName: CYA-P_SRVAdm1
deviceAction: Use Password
deviceAddress: 11.206.211.134
deviceCustomString1:
deviceCustomString1Label: "Affected User Name"
deviceCustomString2: CYA-P_SRVAdm1
deviceCustomString2Label: "Safe Name"
deviceCustomString3: Operating System
deviceCustomString3Label: "Device Type"
deviceCustomString4:
deviceCustomString4Label: "Database"
deviceCustomString5Label: "Ot
deviceEventClassId: 308
deviceProduct: Vault
deviceVendor: Cyber-Ark
deviceVersion: 11.4.0000
event.action: Use Password
event.code: 308
event.severity: 5
externalId:
file.name: Root\Operating System-CON_Win-domain-Adm1
fileName: Root\Operating System-CON_Win-domain-Adm1
host.ip: 11.27.22.111
host.port: 56305
name: Use Password
observer.product: Vault
observer.vendor: Cyber-Ark
observer.version: 11.4.0000
port: 56,305
reason:
severity: 5
source.address: 10.215.244.83
source.10.215.244.83
source.user.name:
Multi fields
source.user.name.text

sourceHostName: 10.215.244.83 sourceUserName: suricata.eve.alert.severity: 5 suricata.eve.src_ip: 10.215.244.83 suricata.eve.timestamp: Jun 14, 2021 @ 08:17:09.931 syslog_message: <5>1 2021-06-14T06:17:04Z CYA-VAULT-SERVER

@timestamp: Jun 14, 2021 @ 08:17:09.934
@version: 1
host.ip: 10.215.244.83
host.port: 56305
log.original: her info" cs5=11.206.211.134 cn1Label="Request Id" cn1= cn2Label="Ticket Id" cn2=(Action: Connect) msg=(Action: Connect)
message: her info" cs5=11.206.211.134 cn1Label="Request Id" cn1= cn2Label="Ticket Id" cn2=(Action: Connect) msg=(Action: Connect)
port: 56,305
suricata.eve.timestamp: Jun 14, 2021 @ 08:17:09.934
tags: _cefparsefailure

My pipeline configuration is

input {
  tcp {
    id => "cyberark-tcp-input"
    port => 5146
    codec => "cef"
  }
}

filter {
  if [host] { mutate {
        add_field => { "[host_temp]" => "%{[host]}" }
        remove_field => ["host"]
  } }
  if [syslog] { mutate {
        add_field => { "[syslog_temp]" => "%{[syslog]}" }
        remove_field => ["syslog"]
  } }

  if [applicationProtocol] and ![network][protocol] { mutate { add_field => { "[network][protocol]" => "%{[applicationProtocol]}" } } }
  if [cefVersion] and ![cef][version] { mutate { add_field => { "[cef][version]" => "%{[cefVersion]}" } } }
  if [destinationHostName] and ![destination][address] { mutate { add_field => { "[destination][address]" => "%{[destinationHostName]}" } } }
  if [destinationUserName] and ![destination][user][name] { mutate { add_field => { "[destination][user][name]" => "%{[destinationUserName]}" } } }
  if [deviceAction] and ![event][action] { mutate { add_field => { "[event][action]" => "%{[deviceAction]}" } } }
  if [deviceAddress] and ![cef][extensions][deviceAddress] { mutate { add_field => { "[cef][extensions][deviceAddress]" => "%{[deviceAddress]}" } } }
  if [deviceEventClassId] and ![event][code] { mutate { add_field => { "[event][code]" => "%{[deviceEventClassId]}" } } }
  if [deviceProduct] and ![observer][product] { mutate { add_field => { "[observer][product]" => "%{[deviceProduct]}" } } }
  if [deviceVendor] and ![observer][vendor] { mutate { add_field => { "[observer][vendor]" => "%{[deviceVendor]}" } } }
  if [deviceVersion] and ![observer][version] { mutate { add_field => { "[observer][version]" => "%{[deviceVersion]}" } } }
  if [externalId] and ![cef][extensions][externalId] { mutate { add_field => { "[cef][extensions][externalId]" => "%{[externalId]}" } } }
  if [fileName] and ![file][name] { mutate { add_field => { "[file][name]" => "%{[fileName]}" } } }
  if [host_temp] and ![host][ip] { mutate { add_field => { "[host][ip]" => "%{[host_temp]}" } } }
  if [message] and ![log][original] { mutate { add_field => { "[log][original]" => "%{[message]}" } } }
  if [port] and ![host][port] { mutate { add_field => { "[host][port]" => "%{[port]}" } } }
  if [reason] and ![event][reason] { mutate { add_field => { "[event][reason]" => "%{[reason]}" } } }
  if [severity] and ![event][severity] { mutate { add_field => { "[event][severity]" => "%{[severity]}" } } }
  if [sourceHostName] and ![source][address] { mutate { add_field => { "[source][address]" => "%{[sourceHostName]}" } } }
  if [source][address] { mutate { add_field => { "[source][ip]" => "%{[source][address]}" } } }
  if [sourceUserName] and ![source][user][name] { mutate { add_field => { "[source][user][name]" => "%{[sourceUserName]}" } } }
  if [syslog_temp] and ![syslog_message] { mutate { add_field => { "[syslog_message]" => "%{[syslog_temp]}" } } }

  mutate { remove_field => ["host_temp", "syslog_temp"] }
}

filter {
  if [deviceProduct] == "PTA" {
    if [source][ip] == "None"  { mutate { remove_field => ["[source][ip]"]} }
    if [source][address] == "None"  { mutate { remove_field => ["[source][address]"]} }
    if [sourceHostName] == "None"  { mutate { remove_field => ["[sourceHostName]"]} } 
  }
}

output {
    file {
        path => "/usr/share/logstash/cyberark-events.txt"
    }
}

I'm not sure where to go from here, as it seems the tcp input splits the incoming event before it even reaches the filters.

I believe this issue is related. (The issue talks a lot about the multiline codec, but I think the fundamental issue is the buffering.)

Thanks for the reply, it does look like it could be related to the buffer size. Though the messages really are not that long - do you think the issue is with the CEF codec? I think the TCP input processor itself should be able to handle messages that are even longer based on other log sources we have.

Options could be to ditch the CEF codec and parse manually, or maybe to put a Filebeat between the source and Logstash if the CEF module is able to handle larger input - do you have an opinion on those?

Not sure. I wonder if you need to set the delimiter option.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.