CEF codec outputting multiple CEF logs into single CEF l

Hey Everyone,
I am having an issue when I output to a syslog sever using the CEF codec from the logs we receive from Filbeat. The logs contain mulitple individual CEF longs into one log, so sometimes you can get 3 different logs in one CEF packet. This has been confirmed by running tcpdump at the server receiving the logs . Is there a way to break these up? Below is the single CEF logs treated as one individual log packet event.

CEF:0|Elasticsearch|Logstash|1.0|Logstash|Logstash|6|start=05-JUL-21 00:00:50 cs1=1 cs2=test_user cs3=Z/dtEse7dwP2VEVRwULGNwhEur0= cs4=TEST Usageview src=1.1.1.1 act=Authentication Successful deviceProduct=TEST hdeviceVendor=TEST_VendorCEF:0|Elasticsearch|Logstash|1.0|Logstash|Logstash|6|start=05-JUL-21 00:01:06 cs1=1 cs2=test_user cs3=WxAcnDCm6yJP/jWmjtwsXe8UAdM= cs4=Hybri src=172.19.31.250 act=Auth Success deviceProduct=TEST deviceVendor=TEST_VendorCEF:0|Elasticsearch|Logstash|1.0|Logstash|Logstash|6|start=05-JUL-21 00:01:11 cs1=5 cs2=test_user.user1 cs3=VO2m0oHLoQh+vIWKKWdoc1k0epg= cs4=TEST view src=1.1.1.1 act=Authorization Successful deviceProduct=TEST deviceVendor=TEST_VendorCEF:0|Elasticsearch|Logstash|1.0|Logstash|Logstash|6|start=05-JUL-21 00:01:12 cs1=5 cs2=ca66489 cs3=8QkZ3G3GDkT5iY5VyfJ9dvY5dg0= cs4=Tech src=1.1.1.1 act=Authorization Successful deviceProduct=TEST deviceVendor=TEST_Vendor

My Output statement is as follows:

output
{
tcp
{
host => ["SmartConnector"]
port => "514"
codec => cef { reverse_mapping => false fields => [ "start", "cs1", "cs2", "cs3", "cs4", "src", "act", "deviceProduct", "deviceVendor" ] }
}
}  

You could use

    ruby {
        code => '
            matches = event.get("message").scan(/(CEF:0.*?)(?=(CEF:0|$))/)
            #event.remove("message")
            interestingMatches = []
            matches.each { |x| interestingMatches << x[0] }
            event.set("message", interestingMatches)
        '
    }
    split { field => "message" }

The interestingMatches bit is needed because the parentheses used for alternation in (CEF:0|$) also make it a capture group, so there are two capture groups for every match and we only want the first one: (CEF:0.*?)

That will produce four events like

     "message" => "CEF:0|Elasticsearch|Logstash|1.0|Logstash|Logstash|6|start=05-JUL-21 00:00:50 cs1=1 cs2=test_user cs3=Z/dtEse7dwP2VEVRwULGNwhEur0= cs4=TEST Usageview src=1.1.1.1 act=Authentication Successful deviceProduct=TEST hdeviceVendor=TEST_Vendor",

I don't think your output codec will do what you want but that's a separate question.

.scan searches the string for occurrences of the regular expression. The regexp matches "CEF:0" plus non-greedy additional text up until it finds an occurrence of either "CEF:0" or $ (the end of the string). The non-greedy part is important

/(CEF:0.*?)(?=(CEF:0|$))/

will capture four matches. It saves the match the first time the lookahead hits.

/(CEF:0.*)(?=(CEF:0|$))/

will capture the entire string in one match. It does not stop and save the match until the last time the lookahead hits (at end of string).

I am so sorry. I forgot to mention that my filter is taking a NON CEF event and converting it to CEF format and thats where the trouble is.....it seems to put a bunch of them together So filter is below
so the log files look like the below and I have my filter below

"message":"05-JUL-21 00:00:50|1.1.1.1|1|CN=test_user,OU=test Users,OU=test,OU=Business,DC=tes,DC=corp,DC=abc,DC=ca|Z/dtEse7dwP2VEVRwULGNwhEur0=|App"

filter
{
grok   {
        pattern_definitions => {
            "CUSTOMMONTH" => "(JAN|FEB|MAR|APR|MAY|JUN|JUL|AUG|SEP|OCT|NOV|DEC)"
            "CUSTOMTIMESTAMP" => "%{YEAR}-%{CUSTOMMONTH}-%{MONTHDAY} %{TIME}"
        }
         match => { "message" => [ "%{CUSTOMTIMESTAMP:timestamp}\|(%{IPV4:src})?\|%{WORD:cs1}\|(%{GREEDYDATA:dn})?\|(%{NOTSPACE:cs3})?\|(%{GREEDYDATA:cs4})?" ] }

         }
         
grok
       {
        match => { "dn" => [ "(\w+\=%{NOTSPACE:cs2}\,)?" ] }
        }
       
if [dn] =~ /^session ID=/ { mutate { remove_field => [ "dn" ] } }

date {
        match => [ "timestamp", "dd-MMM-yy HH:mm:ss" ]
        target => "@timestamp"
      }

       
mutate {
        add_field => { "deviceProduct" => "test"}
        add_field => { "deviceVendor" => "test"}
        
       }
       
translate {
        field => "[cs1]"
        destination => "[event.outcome]"
        dictionary => {
          "1" => "Authentication Successful"
          "2" => "Authentication Failed"
          "3" => "Authentication Attempt - unknown user"
          "5" => "Authorization Successful"
          "6" => "Authorization Failed"
          "10" => "Logout/Timeout"
          "12" => "Validation Failed"
        }
        fallback => "uknown"

      }
     
       
mutate {

rename              => {"timestamp"                   =>     "start"}       
rename              => {"event.outcome"               =>     "act"}          
        }

}
 
output
{
tcp
{
host => ["SmartConnector"]
port => "514"
codec => cef { reverse_mapping => false fields => [ "start", "cs1", "cs2", "cs3", "cs4", "src", "act", "deviceProduct", "deviceVendor" ] }
}
}  

You have not set a delimiter, and the code defaults to not using one (the default value for that field is an empty string). If whatever you are sending data to expects delimited messages then you will need to set it. And given that TCP is a stream-based protocol it is pretty much certain the receiver expects a delimeter.

Thanks. Worked