Decryption of 3rd Party Logs Using Filebeat and Logstash

Hi,

I need some help with configuration and design.

I’ve created a Python application that integrates with a specific source and downloads files that do not change. These files are saved on my local Ubuntu machine in a directory such as:
/home/test/logs/foldername/extracted/somename/filename

The application downloads multiple files, and sometimes it needs to extract them. This may result in files like:
filename.log.debug and filename.log.debug.1.
All files are encrypted using AES-256-CBC with PKCS7 padding.

After being saved in the directory, Filebeat picks up the files. The configuration is somewhat complex, but here’s an example:

filebeat.inputs:

############################# mgmt logs ################################
- type: filestream
  id: mgmt-log-normal
  enabled: true
  paths:
    - "/home/test/cse_logs/*/extracted/normal/mgmt*"
  parsers:
    - multiline:
        type: pattern
        pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2}'
        negate: true
        match: after
  fields:
    input_id: mgmt_log_normal

- type: filestream
  id: mgmt-log-debug
  enabled: true
  paths:
    - "/home/test/cse_logs/*/extracted/debug/mgmt*"
  parsers:
    - multiline:
        type: pattern
        pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2}'
        negate: true
        match: after
  fields:
    input_id: "mgmt_log_debug"

The logs are then sent to Logstash via a beats pipeline, configured as follows:

input {
  beats {
    port => 5044
    ssl_enabled => true
    ssl_certificate => "/usr/share/logstash/config/certs/logstash.crt"
    ssl_key => "/usr/share/logstash/config/certs/logstash.key"
  }
}

filter {
  if [fields][input_id] and [fields][input_id] != "" {
    mutate {
      add_field => { "[@metadata][target_pipeline]" => "%{[fields][input_id]}" }
    }
  } else {
    mutate {
      add_field => { "[@metadata][target_pipeline]" => "default" }
    }
  }
}

output {
  # Define individual conditions for each target pipeline
  if [@metadata][target_pipeline] == "mgmt_log_debug" {
    pipeline { send_to => "mgmt_log_debug" }
  } else if [@metadata][target_pipeline] == "mgmt_log_normal" {
    pipeline { send_to => "mgmt_log_normal" }
  } else {
    pipeline { send_to => "default" }
  }
  stdout { codec => rubydebug }
}

Eventually, the logs are sent to another pipeline, such as mgmt_log_debug, which processes and indexes the logs:

input {
  pipeline { address => "mgmt_log_debug" }
}

filter {
    if  [fields][input_id] == "mgmt_log_debug" {
        grok {
            match => { "message" => "%{DATESTAMP:log_timestamp}%{SPACE}%{WORD:level}%{SPACE}%{WORD:app}%{SPACE}\[%{DATA:process}\]%{SPACE}\[%{DATA:thread}\]%{SPACE}%{DATA:class}%{SPACE}%{NUMBER:line}\:%{SPACE}%{GREEDYDATA:log_message}" }
        }
        mutate {
            add_field => { "class_line" => "%{class}:%{line}" }
        }
        date {
            match => [ "log_timestamp", "yy-MM-dd HH:mm:ss,SSS" ]
            target => "log_date"
        }
    }
    grok {
        match => { "[log][file][path]" => "/home/pentera/cse_logs/%{GREEDYDATA:parent_dir}/extracted/debug/%{GREEDYDATA:filename}" }
    }
    mutate {
        add_field => { "source" => "%{parent_dir}" }
    }
}

output {
    elasticsearch {
        hosts => '${ELASTICSEARCH_HOST_PORT}'
        index => "cse-%{source}"
        user => "${ELASTIC_USERNAME}"
        password => "${ELASTIC_PASSWORD}"
        ssl => true
        cacert => "/certs/ca.crt"
    }
    stdout { codec => rubydebug }
}

Each file is around 50 MB in size.

I attempted to add decryption using the Cipher filter plugin or custom Ruby code, but it’s not working. It seems that Filebeat sends logs in batches, and even after tweaking the parsers to avoid filtering, the logs are still sent in batches. As a result, Logstash is unable to decrypt the files properly.

Has anyone encountered a similar use case or have any suggestions on what I can do to fix this? Any guidance or assistance would be greatly appreciated.

Thank you!

Can you share an example of how your files looks like?

Both Filebeat and Logstash are line oriented, so each line break will represente an event.

It seems that you need to read the entire 50 MB file and send it as a single event?