Alright, I've done some more research on this. This is my only beats input pipeline and within this pipeline, I split off and parse seven different log sources. They all end up in a variety of indices in Elasticsearch with a catch-all index at the end. The JSON filter is only called the one time in this pipeline. With filter debugging enabled (as defined here) and pipeline.log_sources: true
set in my logstash.yml. The below gets logged in the beats pipeline log.
[DEBUG][logstash.filters.json ] Running json filter {:event=>#<LogStash::Event:0xb7d5666>}
[DEBUG][logstash.filters.json ] Event after json filter {:event=>#<LogStash::Event:0x293d386d>}
The below gets logged in logstash-plain.log
[2020-11-25T09:33:05,914][DEBUG][logstash.filters.json ] config LogStash::Filters::Json/@source = "privacyidea_message"
[2020-11-25T09:33:06,227][DEBUG][logstash.filters.json ] config LogStash::Filters::Json/@id = "fbfef07ebe7f412de7b8b68d807e971d25b17c19168877ebfa99ff36e17b788e"
[2020-11-25T09:33:06,367][DEBUG][logstash.filters.json ] config LogStash::Filters::Json/@enable_metric = true
[2020-11-25T09:33:06,388][DEBUG][logstash.filters.json ] config LogStash::Filters::Json/@add_tag = []
[2020-11-25T09:33:06,423][DEBUG][logstash.filters.json ] config LogStash::Filters::Json/@remove_tag = []
[2020-11-25T09:33:06,453][DEBUG][logstash.filters.json ] config LogStash::Filters::Json/@add_field = {}
[2020-11-25T09:33:06,492][DEBUG][logstash.filters.json ] config LogStash::Filters::Json/@remove_field = []
[2020-11-25T09:33:06,529][DEBUG][logstash.filters.json ] config LogStash::Filters::Json/@periodic_flush = false
[2020-11-25T09:33:06,537][DEBUG][logstash.filters.json ] config LogStash::Filters::Json/@tag_on_failure = ["_jsonparsefailure"]
[2020-11-25T09:33:06,537][DEBUG][logstash.filters.json ] config LogStash::Filters::Json/@skip_on_invalid_json = false
Pipeline Configuration - Removed parts of pipeline that do not effect these logs. (Full pipe is like 700 lines)
input {
beats {
id => "Beats Input"
port => 5044
include_codec_tag => false
}
}
filter {
else if [@metadata][beat] == "filebeat" {
if [log][source] == "privacyidea" {
fingerprint {
id => "PrivacyIDEA Duplication Protection"
source => "message"
target => "[@metadata][fingerprint]"
method => "MURMUR3"
}
#Drop java errors
if [message] =~ /^(Traceback|KeyError|\s)/ {
drop { }
}
#JSON non-JSON formatted messages
if [message] =~ /^\[.*?\]\[.*?\]\[.*?\]\w.*/ {
mutate {
id => "PrivacyIDEA JSON Encode messages"
gsub => [
"message", '(\[.*?\]\[.*?\]\[.*?\])(.*)', '\1{"info": "\2"}'
]
}
}
#Normalize message
mutate {
id => "PrivacyIDEA lowercase message"
lowercase => [ "message" ]
}
#Parse syslog formatted fields out
grok {
id => "PrivacyIDEA syslog parsing"
match => ["message", "\[%{TIMESTAMP_ISO8601:time}]\[%{LOGLEVEL:log_level}]\[%{SYSLOGPROG:module}]%{GREEDYDATA:privacyidea_message}"]
}
json {
source => "privacyidea_message"
}
}
}
}
output {
else if [@metadata][beat] == "filebeat" {
if [log][source] == "privacyidea" {
elasticsearch {
id => "PrivacyIDEA Output"
hosts => "ElasticsearchServer:9200"
document_id => "%{[@metadata][fingerprint]}"
index => "privacyidea-%{+YYYY.MM}"
}
}
else {
elasticsearch {
id => "Filebeat Out"
cacert => "D:/Logstash/config/certs/realbundle.pem"
user => "elastic"
password => "K6m^OsUUoE&E"
hosts => "https://ssselklog.stlouisco.net:9200"
ilm_rollover_alias => "filebeat"
ilm_pattern => "{now/d}-000001"
ilm_policy => "Delete_After_30D"
template => "D:/Logstash/templates/filebeattemplate.json"
template_name => "filebeat"
template_overwrite => true
}
}
}
}
Removing the json filter from my pipeline, the below event gets ingested into elasticsearch. I included a couple different fields to
message field
[2020-11-25 09:32:58,952][info][pi-audit]{"action": "post /validate/check", "action_detail": "", "client": "192.168.1.1", "client_user_agent": null, "info": "err905: missing parameter: 'pass'", "policies": "", "privacyidea_server": "contoso", "success": false, "timestamp": "2020-11-25t15:32:58.952025"}
privacyidea_message field
{"action": "post /validate/check", "action_detail": "", "client": "192.168.1.1", "client_user_agent": null, "info": "err905: missing parameter: 'pass'", "policies": "", "privacyidea_server": "contoso", "success": false, "timestamp": "2020-11-25t15:32:58.952025"}
_source field
agent.id:369a6767-44ee-4bc3-abd9-cd168e3e9849 agent.version:7.10.0 agent.ephemeral_id:7ce8f4f1-5d97-4295-bec3-e97bf2f629f4 agent.hostname:contoso agent.type:filebeat agent.name:contoso message:[2020-11-25 09:32:58,952][info][pi-audit]{"action": "post /validate/check", "action_detail": "", "client": "192.168.1.1", "client_user_agent": null, "info": "err905: missing parameter: 'pass'", "policies": "", "privacyidea_server": "contoso", "success": false, "timestamp": "2020-11-25t15:32:58.952025"} log_level:info ecs.version:1.6.0 host.name:contoso log.source:privacyidea log.file.path:/var/log/privacyidea/audit.log log.offset:20269333 module:pi-audit @version:1 privacyidea_message:{"action": "post /validate/check", "action_detail": "", "client": "192.168.1.1", "client_user_agent": null, "info": "err905: missing parameter: 'pass'", "policies": "", "privacyidea_server": "contoso", "success": false, "timestamp": "2020-11-25t15:32:58.952025"} program:pi-audit time:2020-11-25 09:32:58,952 input.type:log @timestamp:Nov 25, 2020 @ 09:32:59.262 _id:557983225 _index:privacyidea-2020.11