Hi,
I need some help with configuration and design.
I’ve created a Python application that integrates with a specific source and downloads files that do not change. These files are saved on my local Ubuntu machine in a directory such as:
/home/test/logs/foldername/extracted/somename/filename
The application downloads multiple files, and sometimes it needs to extract them. This may result in files like:
filename.log.debug
and filename.log.debug.1
.
All files are encrypted using AES-256-CBC with PKCS7 padding.
After being saved in the directory, Filebeat picks up the files. The configuration is somewhat complex, but here’s an example:
filebeat.inputs:
############################# mgmt logs ################################
- type: filestream
id: mgmt-log-normal
enabled: true
paths:
- "/home/test/cse_logs/*/extracted/normal/mgmt*"
parsers:
- multiline:
type: pattern
pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2}'
negate: true
match: after
fields:
input_id: mgmt_log_normal
- type: filestream
id: mgmt-log-debug
enabled: true
paths:
- "/home/test/cse_logs/*/extracted/debug/mgmt*"
parsers:
- multiline:
type: pattern
pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2}'
negate: true
match: after
fields:
input_id: "mgmt_log_debug"
The logs are then sent to Logstash via a beats pipeline, configured as follows:
input {
beats {
port => 5044
ssl_enabled => true
ssl_certificate => "/usr/share/logstash/config/certs/logstash.crt"
ssl_key => "/usr/share/logstash/config/certs/logstash.key"
}
}
filter {
if [fields][input_id] and [fields][input_id] != "" {
mutate {
add_field => { "[@metadata][target_pipeline]" => "%{[fields][input_id]}" }
}
} else {
mutate {
add_field => { "[@metadata][target_pipeline]" => "default" }
}
}
}
output {
# Define individual conditions for each target pipeline
if [@metadata][target_pipeline] == "mgmt_log_debug" {
pipeline { send_to => "mgmt_log_debug" }
} else if [@metadata][target_pipeline] == "mgmt_log_normal" {
pipeline { send_to => "mgmt_log_normal" }
} else {
pipeline { send_to => "default" }
}
stdout { codec => rubydebug }
}
Eventually, the logs are sent to another pipeline, such as mgmt_log_debug
, which processes and indexes the logs:
input {
pipeline { address => "mgmt_log_debug" }
}
filter {
if [fields][input_id] == "mgmt_log_debug" {
grok {
match => { "message" => "%{DATESTAMP:log_timestamp}%{SPACE}%{WORD:level}%{SPACE}%{WORD:app}%{SPACE}\[%{DATA:process}\]%{SPACE}\[%{DATA:thread}\]%{SPACE}%{DATA:class}%{SPACE}%{NUMBER:line}\:%{SPACE}%{GREEDYDATA:log_message}" }
}
mutate {
add_field => { "class_line" => "%{class}:%{line}" }
}
date {
match => [ "log_timestamp", "yy-MM-dd HH:mm:ss,SSS" ]
target => "log_date"
}
}
grok {
match => { "[log][file][path]" => "/home/pentera/cse_logs/%{GREEDYDATA:parent_dir}/extracted/debug/%{GREEDYDATA:filename}" }
}
mutate {
add_field => { "source" => "%{parent_dir}" }
}
}
output {
elasticsearch {
hosts => '${ELASTICSEARCH_HOST_PORT}'
index => "cse-%{source}"
user => "${ELASTIC_USERNAME}"
password => "${ELASTIC_PASSWORD}"
ssl => true
cacert => "/certs/ca.crt"
}
stdout { codec => rubydebug }
}
Each file is around 50 MB in size.
I attempted to add decryption using the Cipher filter plugin or custom Ruby code, but it’s not working. It seems that Filebeat sends logs in batches, and even after tweaking the parsers to avoid filtering, the logs are still sent in batches. As a result, Logstash is unable to decrypt the files properly.
Has anyone encountered a similar use case or have any suggestions on what I can do to fix this? Any guidance or assistance would be greatly appreciated.
Thank you!