I'm using Logstash 7.4.
I would like to try dissecting my logs (in this example apache logs) with multiple patterns, each having possibly its own mutations (renaming, formatting, etc).
I thought about the configuration hereunder, but I'm not sure about it. Would you please tell me if it's a good practice, or if I should organize my code differently.
Thank you very much in advance!
input {
syslog {
port => 514
}
}
filter {
if "found" not in [tags] {
dissect {
mapping => { "message" => '%{saddr} %{fld5} %{username} [%{fld7} %{timezone}] "%{web_method} %{webpage} %{network_service}" %{resultcode} %{sbytes}' }
add_tag => [ "found" ]
remove_tag => [ "_dissectfailure" ]
remove_field => [ "message" ]
}
if "_dissectfailure" not in [tags] {
date {
match => ["fld7", "dd/MMM/yyyy:HH:mm:ss"]
}
mutate {
...
}
}
}
}
filter {
if "found" not in [tags] {
dissect {
mapping => { "message" => '%{saddr} %{fld5} %{username} [%{fld7} %{timezone}] "%{web_method} "%{web_host}" "%{webpage}" "%{web_query}" %{network_service}" %{resultcode} %{sbytes} "%{web_referer}" "%{user_agent}" "%{web_cookie}"' }
add_tag => [ "found" ]
remove_tag => [ "_dissectfailure" ]
remove_field => [ "message" ]
}
if "_dissectfailure" not in [tags] {
date {
match => ["fld7", "dd/MMM/yyyy:HH:mm:ss"]
}
mutate {
...
}
}
}
}
output {
if "_dissectfailure" in [tags] {
file { path => "failed_logs-apache-%{+YYYY-MM-dd}" }
} else {
elasticsearch {
hosts => ["https://elasticxxxxxx"]
index => "apache-%{+YYYY.MM.dd}"
}
}
}
In general, this seems like a sensible approach, but there are a few "gotchas" to look out for.
I notice that the two example sections you shared have a common prefix; if this is the case for all of the logs being processed by your pipeline, it may make sense to "peel" this layer off once, stashing the rest for future parsing. This allows you to have a single section in your pipelines that is dedicated to handling the common prefix, and is also more efficient because the pipeline only has to do the work of parsing that prefix once per event.
input {
syslog {
port => 514
}
}
# split off the common prefix from `message`,
# leaving the unparsed remainder in its place
filter {
dissect {
mapping => { "message" => '%{saddr} %{fld5} %{username} [%{fld7} %{timezone}] %{message}' }
remove_tag => [ "_dissectfailure" ]
}
if "_dissectfailure" not in [tags] {
date {
match => [ "fld7", "dd/MMM/yyyy:HH:mm:ss"]
timezone => "%{timezone}"
}
}
}
filter {
if "found" not in [tags] {
# ...
}
}
As a side note, I would advise using a standard date format (e.g., ISO8601) if you have control of the shape of the data; using MMM is especially sensitive to the system locale aligning with the data (e.g., the german abbreviation Mär will only match if the locale of the machine or of the parser is explicitly configured to be german).
Because the Dissect filter concerns itself with the shape of the separators of the fields, and not the shape of the fields themselves, it can easily find "false positives", which would cause the event to map to one or more filters that it wasn't intended for (especially two events that have the same number of fields as each other, separated in a consistent manner but meaning different things).
Thank you very much @yaauie
your advices are very valuable!
I'm unsure about the common prefix (because this logstash configuration is automatically generated), but that could definitely be an area for optimization.
I did hesitate a lot between dissect & grok... but i don't control the source format (it's mostly security devices) so I can't specify the date format or timezone...
Thanks again!
Apache, Apache Lucene, Apache Hadoop, Hadoop, HDFS and the yellow elephant
logo are trademarks of the
Apache Software Foundation
in the United States and/or other countries.