Multiple pipelines bug with pipe-to-pipe config and CEF codec

Hi, everyone!
I have faced with such problem: several CEF strings pushed into the following pipelines configuration causing _cefparseerror in result cause to incorrect string in the input. It's break original message in random place and do not parse wit CEF codec properly as result. This error do appear either in real ELK stack and in test environment (logstash pipeline tester from epacke).
Reason for two pipelines is incorrect timestamp in some messages i want to fix. Pay attention to 'timestamps' with minuses in given input strings.
This bug appear not every launch of the tests. About 1 error for 10 launches.

pipelines.yml:

- pipeline.id: cef_correction_1st_stage
  path.config: "/etc/logstash/conf.d/aix-in.conf"
- pipeline.id: cef_correction_2st_stage
  path.config: "/etc/logstash/conf.d/aix-out.conf"

aix-in.conf:

input {
  kafka {
    codec => plain {}
    bootstrap_servers => "KAFKA_SERVER"
    topics => "TOPIC_NAME"
    client_id => "logstash01"
    decorate_events => basic
    group_id => "test_group"
  }
}
filter {
  if "-6208" in [message] {
    ruby {
      code => "
        event.set('current_time', Time.now().to_i);
        puts Time.now().to_i
      "
    }
    mutate {
      gsub => ["message", "-6208[0-9]+", "%{current_time}"]
    }
    mutate {
      add_tag => ["Timestamp was changed"]
    }
  }
  mutate {
    add_field => {
     "cefField" => "%{message}"
    }
  }
}
output {
    tcp {
        host => localhost
        port => 12888
        codec => plain {
            format => "%{cefField}"
        }
    }
}

aix-out.conf

input {
    tcp {
        host => localhost
        port => 12888       # Listener port (tcp)
        codec => cef {}
    }
}
filter {
  ruby {
    code => "
      event.set('[event][start]', Time.now());
      event.set('[event][end]', Time.now());
    "
  }
}
output {
   elasticsearch {
     hosts => ["ELASTIC_HOST"]
     http_compression => true
     index => "index_test"
     manage_template => true
     template_overwrite => false
     template => "/etc/logstash/conf.d/aix.template"
     user => "${es_pl_login}"
     password => "${es_pl_pwd}"
   }
}

Input data:

CEF:0|IBM|AIX Audit PR||PROC_SetUserIDs|PROC_SetUserIDs|Low| eventId=1856831065 msg=orarootagent.bin                1642298  50072098          effect: 1000, real: 1000, saved: -1, login: -1 start=-62086925465000 end=-62086925465000 categorySignificance=/Informational categoryBehavior=/Modify/Attribute categoryDeviceGroup=/Application catdt=Operating System categoryOutcome=/Attempt categoryObject=/Host/Resource/Process art=1689839632340 deviceSeverity=OK act=OK rt=-62086925465000 suser=user_ru1 sproc=2 sourceServiceName=0 flexString1=20230720105200 cn2=3 cs1Label=ACL cs2Label=Group cs3Label=Owner cs4Label=Reason or Error Code cs5Label=PCL cs6Label=Volume Group ID cn1Label=File Descriptor cn2Label=Parent PID cn3Label=Physical Volume Index ahost=host-spb-best.sims.ru agt=10.0.0.2 agentZoneURI=/All Zones/System/Private Address Space Zones/RFC1918: 10.0.0.0-10.255.255.255 amac=00-50-56-B9-68-28 av=8.4.0.8955.0 atz=Europe/Moscow at=sdkmultifolderreader dvchost=hostname-db3 dvc=10.0.0.1 deviceZoneURI=/All Zones/System/Private Address Space Zones/RFC1918: 10.0.0.0-10.255.255.255 dtz=Europe/Moscow geid=0 _cefVer=0.1 aid=3fh2sPoEBABCUfDP6qAyT2w\=\=
CEF:0|IBM|AIX Audit PR||PROC_SetUserIDs|PROC_SetUserIDs|Low| eventId=1856831065 msg=orarootagent.bin                1642298  50072098          effect: 1000, real: 1000, saved: -1, login: -1 start=1689856215000 end=1689856215000 categorySignificance=/Informational categoryBehavior=/Modify/Attribute categoryDeviceGroup=/Application catdt=Operating System categoryOutcome=/Attempt categoryObject=/Host/Resource/Process art=1689839632340 deviceSeverity=OK act=OK rt=1689856215000 suser=user_ru1 sproc=2 sourceServiceName=0 flexString1=20230720105200 cn2=3 cs1Label=ACL cs2Label=Group cs3Label=Owner cs4Label=Reason or Error Code cs5Label=PCL cs6Label=Volume Group ID cn1Label=File Descriptor cn2Label=Parent PID cn3Label=Physical Volume Index ahost=host-spb-best.sims.ru agt=10.0.0.2 agentZoneURI=/All Zones/System/Private Address Space Zones/RFC1918: 10.0.0.0-10.255.255.255 amac=00-50-56-B9-68-28 av=8.4.0.8955.0 atz=Europe/Moscow at=sdkmultifolderreader dvchost=hostname-db3 dvc=10.0.0.1 deviceZoneURI=/All Zones/System/Private Address Space Zones/RFC1918: 10.0.0.0-10.255.255.255 dtz=Europe/Moscow geid=0 _cefVer=0.1 aid=3fh2sPoEBABCUfDP6qAyT2w\=\=

Bug screenshot:

I do not understand the reason, but note that in your screenshot it says "Will send 2 documents", so it seems to be telling you that it will break the line up.

Hey, Badger, thanks for the reply!

For sure it says so, cause in the input there are two CEF strings to work on. I've attached them in the first post.

What do you have in the logs? If you have _cefparsefailure in the tags field this mean that the message is not a valid cef message for the codec and logstash wil emit a ERROR log with more information about this.

Hi, leandrojmp! Thanks for the reply!

Input string are the same except timestamp which i replaced in the first pipeline. I just spmetimes get error for the same input, and sometimes not.


Error log tells obvious:

logstash       | [2023-07-20T18:01:18,561][INFO ][logstash.agent           ] Pipelines running {:count=>4, :running_pipelines=>[:cef_correction_2nd_stage, :"generic-json", :cef_correction_1st_stage, :"f5-syslog"], :non_running_pipelines=>[]}
logstash       | 1689876226
logstash       | [2023-07-20T18:03:46,940][ERROR][logstash.codecs.cef      ][cef_correction_2nd_stage][ad36491453a9a88615aa7135b78889108006eb564854f935136a2c31f1c84d9e] Failed to decode CEF payload. Generating failure event with payload in message field. {:exception=>NoMethodError, :message=>"undefined method `include?' for nil:NilClass", :backtrace=>["/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-codec-cef-6.2.3-java/lib/logstash/codecs/cef.rb:241:in `handle'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-codec-cef-6.2.3-java/lib/logstash/codecs/cef.rb:200:in `decode'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-input-tcp-6.2.7-java/lib/logstash/inputs/tcp.rb:187:in `decode_buffer'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-input-tcp-6.2.7-java/lib/logstash/inputs/tcp/decoder_impl.rb:22:in `decode'"], :original_data=>"955.0 atz=Europe/Moscow at=sdkmultifolderreader dvchost=hostname-db3 dvc=10.0.0.1 deviceZoneURI=/All Zones/System/Private Address Space Zones/RFC1918: 10.0.0.0-10.255.255.255 dtz=Europe/Moscow geid=0 _cefVer=0.1 aid=3fh2sPoEBABCUfDP6qAyT2w\\=\\="}
logstash       | 1689876227
logstash       | 1689876253
logstash       | 1689876254

Unix timestamp in the log means successful parse. I emphasize: it's the same input ant the same pipelines, just several launches.

What makes you think the bug is in logstash rather than the pipeline testing tool you are using?

Because i have the same errors in my ELK stack and also not for all input strings. I wrote about it in the first post. I just can't screenshot them, cause they contains too much confidential info.

Please, pay attention to the string with broken parsing error case. Input string somehow was divided in half.

Screenshot_1

And where this was divided in half? Logstash will not do that unless you configure a split filter.

Did you check the source message, your kafka or what is writing to it, to see if the message wasn't already divided in half there?

You can edit your pipeline to add a tag to messages that do not have AIX Audit PR on it, meaning that they were divided before logstash, so you would be able to validate it.

From what you shared it doesn't look like an issue with logstash, the CEF codec failed because the message is not a valid CEF message.

I don't know what is your original source, but I had a similar issue in the past with a Cisco Network device that was configured to break messages in 1024 bytes chunks, but this was configurable in this device, not sure if this is similar to your case.

This is the case: Logstash do divide message for no reason. Please look at the input strings i provide in start post. It shouldn't, but it did.

Yes, i for sure check input from Kafka. There is one more detail i miss: without such pipe-to-pipe processing it works fine except mess with a timestamps.

I don't think it is Logstash that is dividing the message, I just replicate your pipeline and got no errors.

I used this message:

CEF:0|IBM|AIX Audit PR||PROC_SetUserIDs|PROC_SetUserIDs|Low| eventId=1856831065 msg=orarootagent.bin 1642298 50072098 effect: 1000, real: 1000, saved: -1, login: -1 start=-62086925465000 end=-62086925465000 categorySignificance=/Informational categoryBehavior=/Modify/Attribute categoryDeviceGroup=/Application catdt=Operating System categoryOutcome=/Attempt categoryObject=/Host/Resource/Process art=1689839632340 deviceSeverity=OK act=OK rt=-62086925465000 suser=user_ru1 sproc=2 sourceServiceName=0 flexString1=20230720105200 cn2=3 cs1Label=ACL cs2Label=Group cs3Label=Owner cs4Label=Reason or Error Code cs5Label=PCL cs6Label=Volume Group ID cn1Label=File Descriptor cn2Label=Parent PID cn3Label=Physical Volume Index ahost=host-spb-best.sims.ru agt=10.0.0.2 agentZoneURI=/All Zones/System/Private Address Space Zones/RFC1918: 10.0.0.0-10.255.255.255 amac=00-50-56-B9-68-28 av=8.4.0.8955.0 atz=Europe/Moscow at=sdkmultifolderreader dvchost=hostname-db3 dvc=10.0.0.1 deviceZoneURI=/All Zones/System/Private Address Space Zones/RFC1918: 10.0.0.0-10.255.255.255 dtz=Europe/Moscow geid=0 _cefVer=0.1 aid=3fh2sPoEBABCUfDP6qAyT2w==

From what you shared your message is being divided here:

955.0 atz=Europe/Moscow at=sdkmultifolderreader dvchost=hostname-db3 dvc=10.0.0.1 deviceZoneURI=/All Zones/System/Private Address Space Zones/RFC1918: 10.0.0.0-10.255.255.255 dtz=Europe/Moscow geid=0 _cefVer=0.1 aid=3fh2sPoEBABCUfDP6qAyT2w==

I used the same pipelines that you shared, the only changes it that the input for the in pipeline was changed to stdin, and the output of the out pipeline was changed to file.

I gost just one event without any errors, this is the result:

{
  "deviceAddress": "10.0.0.1",
  "deviceSeverity": "OK",
  "name": "PROC_SetUserIDs",
  "agentVersion": "8.4.0.8955.0",
  "host": "localhost",
  "deviceCustomNumber3Label": "Physical Volume Index",
  "startTime": "1689943543",
  "categoryObject": "/Host/Resource/Process",
  "agentId": "3fh2sPoEBABCUfDP6qAyT2w==",
  "deviceCustomNumber2": "3",
  "deviceZoneURI": "/All Zones/System/Private Address Space Zones/RFC1918: 10.0.0.0-10.255.255.255",
  "deviceCustomNumber1Label": "File Descriptor",
  "deviceCustomNumber2Label": "Parent PID",
  "deviceCustomString4Label": "Reason or Error Code",
  "agentReceiptTime": "1689839632340",
  "message": "orarootagent.bin                1642298  50072098          effect: 1000, real: 1000, saved: -1, login: -1",
  "severity": "Low",
  "categoryOutcome": "/Attempt",
  "deviceCustomString5Label": "PCL",
  "categoryBehavior": "/Modify/Attribute",
  "event": {
    "start": "2023-07-21T12:45:44.104475Z",
    "end": "2023-07-21T12:45:44.104662Z"
  },
  "sourceUserName": "user_ru1",
  "deviceCustomString6Label": "Volume Group ID",
  "agentZoneURI": "/All Zones/System/Private Address Space Zones/RFC1918: 10.0.0.0-10.255.255.255",
  "flexString1": "20230720105200",
  "eventId": "1856831065",
  "agentAddress": "10.0.0.2",
  "agentMacAddress": "00-50-56-B9-68-28",
  "cefVersion": "0",
  "sourceProcessName": "2",
  "@version": "1",
  "deviceCustomString3Label": "Owner",
  "deviceHostName": "hostname-db3",
  "_cefVer": "0.1",
  "deviceAction": "OK",
  "endTime": "1689943543",
  "categoryDeviceGroup": "/Application",
  "@timestamp": "2023-07-21T12:45:43.997510969Z",
  "geid": "0",
  "port": 38132,
  "deviceReceiptTime": "1689943543",
  "deviceVersion": "",
  "deviceCustomString1Label": "ACL",
  "agentType": "sdkmultifolderreader",
  "categorySignificance": "/Informational",
  "deviceProduct": "AIX Audit PR",
  "categoryDeviceType": "Operating System",
  "deviceCustomString2Label": "Group",
  "sourceServiceName": "0",
  "agentHostName": "host-spb-best.sims.ru",
  "deviceTimeZone": "Europe/Moscow",
  "deviceEventClassId": "PROC_SetUserIDs",
  "deviceVendor": "IBM",
  "agentTimeZone": "Europe/Moscow"
}

As you can see, Logstash didn't divided the message, the message and the content was correctly parsed.

Can you replicate the same behavior of Logstash dividing the message on a local lostash, not on that test application? Also can you share the log errors from your production, redact the sensitive information, just the reason of why it ha a cef parse failure would help.

This problem appears with only two messages at once and not every time.

Sure, i will try and return to with results. Thanks.

Hi, Leandro! Hope you're fine.

I've managed to replicate error, but not exactly.
First screenshot is you asked for - from real ELK stack.
Second - error from real ELK stack.
Third - my local test ELK. Pay attention to the Message field. But there were no errors and no tag _cefparsingfailure, what makes me wonder. I will try harder.



Yep, there it is!

Are there any information i can provide to you?

Those screenshots do not have any extra information, it only shows that the cefparsefailure failed, which is expected since the messages are not valid CEF messages, this is already know.

The third screenshot also has no issue, it will not have a cefparsefailure tag because it is a valid CEF message.

The last screenshot just says that logstash received a message containing just e, which of course is not a valid CEF message.

How did you replicate it? What sources you used? What tools did you used? It still does not have any extra information that would confirm the it is Logstash that is dividing the messages.

The main issue here is that some messages are being divided and this will cause parse errors as the divided part is not a valid CEF message, but Logstash do not divides message, so something else in your ingestion flow is causing this.

I used Elasticsearch/KIbana 8.7.1, Logstash 8.4.0 as we have it in real ELK stack, but without Kafka as the source. All this was installed on my local machine with Ubuntu in WSL. CEF was generated by simple Python script like this one: CEF Event Generator – Frank Cardinale and just sent CEF strings into file at /var/log/logstash/cef.log. CEF example:

CEF:0|Seamless Security Solutions|Streamlined Security Product|1.0|1000|Authentication Event|10|src=167.0.0.51 dst=10.0.0.51 duser=Chris msg=Login_Success

Logstash configs.
pipelines.yml:

- pipeline.id: cef_correction_1st_stage
  path.config: "/etc/logstash/conf.d/cef_in.conf"
- pipeline.id: cef_correction_2nd_stage
  path.config: "/etc/logstash/conf.d/cef_out.conf"

cef_in.conf:

input {
  file {
    path => "/var/log/logstash/cef.log"
    start_position => "beginning"
  }
}

filter {
    ruby {
      code => '
        event.set("current_time", Time.now().to_i);
        puts Time.now().to_i
        '
    }
    mutate {
      gsub => ["message", "-6208[0-9]+", "%{current_time}"]
    }
  mutate {
    add_field => {
        "cefField" => "%{message}"
      }
  }
}

output {
    tcp {
        host => localhost
        port => 12888
        codec => plain {
            format => "%{cefField}"
        }
    }
}

cef_out.conf:

input {
    tcp {
        host => localhost
        port => 12888
        codec => cef {}
    }
}


filter {
  ruby {
    code => "
      event.set('[event][start]', Time.now());
      event.set('[event][end]', Time.now());
    "
  }
}

output {
   elasticsearch {
       hosts => ["https://localhost:9200"]
       http_compression => true
       index => "test_index"
       user => "${es_login}"
       password => "${es_pwd}"
       ssl => true
       ssl_certificate_verification => false
   }
}

Error in logstash log was standart error for such case:

[2023-07-24T13:13:33,375][ERROR][logstash.codecs.cef      ] Failed to decode CEF payload. Generating failure event with payload in message field. {:exception=>NoMethodError, :message=>"undefined method `include?' for nil:NilClass", :backtrace=>["/usr/share/logstash/vendor/bundle/jruby/2.6.0/gems/logstash-codec-cef-6.2.5-java/lib/logstash/codecs/cef.rb:241:in `handle'", "/usr/share/logstash/vendor/bundle/jruby/2.6.0/gems/logstash-codec-cef-6.2.5-java/lib/logstash/codecs/cef.rb:200:in `decode'", "/usr/share/logstash/vendor/bundle/jruby/2.6.0/gems/logstash-input-tcp-6.3.0-java/lib/logstash/inputs/tcp.rb:194:in `decode_buffer'", "/usr/share/logstash/vendor/bundle/jruby/2.6.0/gems/logstash-input-tcp-6.3.0-java/lib/logstash/inputs/tcp/decoder_impl.rb:22:in `decode'"], :original_data=>"e"}

And that's all.

Using a redirect in the rsyslog?

So, the message with just the e and the _cefparsefailure is from this script you used, right?

Can you check the file /var/log/logstash/cef.log and look if you have any lines with just an e?

No, Python write into file, Logstash read it.

I'm not sure what you mean. There is no incorrect lines in the file.
You can check it, if you want to: DropMeFiles – free one-click file sharing service

Oh! I think you are right in this specific case, I was making some tests and was able to replicate your issue.

This is even in the documentation.

I just created a very large message and the cef codec split it in multiple messages.

Try to change your codec in the tcp input to this:

        codec => cef {
          delimiter => "\n"
        }

Aha, i have tried this in test tool and it didn't work at all. I will try with my local Ubuntu.

Delimiter doesn't work with either "\n"or "\r" or both. Like in the test tool local Logstash just do not send anything in the Elastic. In the time period between two arrows i've tried to use different setings for 'delimiter' parameter.

There is an error:

[2023-07-24T17:50:02,861][WARN ][logstash.outputs.tcp     ] client socket failed: {:host=>"localhost", :port=>12888, :socket=>"#<TCPSocket:0x7d79069b>", :message=>"End of file reached", :exception=>EOFError}