We use the default apache error elasticsearch pipeline but, we have the requirement that needs a further parsing. We need an additional grok parsing for message field.
How to perform grok parsing when field event_type is "error"?
This is the logstash equivalent
if "error" in [event][type] {
grok {
break_on_match => true
match => {
"message" => [
"%{DATA:ApacheErrorRefId}: %{GREEDYDATA:ErrorDescription} %{DATA:httpmethod} %{GREEDYDATA:request}(?: HTTP/%{NUMBER:httpversion})",
"%{DATA:ApacheErrorRefId}: %{GREEDYDATA:ErrorDescription}: %{GREEDYDATA:filepath}"
]
}
}
}
I am not familiar with elasticsearch processor so i dont have that kind of idea how to start.
{
"filebeat-7.8.0-apache-error-pipeline" : {
"description" : "Pipeline for parsing apache error logs",
"processors" : [
{
"grok" : {
"ignore_missing" : true,
"field" : "message",
"patterns" : [
"\\[%{APACHE_TIME:apache.error.timestamp}\\] \\[%{LOGLEVEL:log.level}\\]( \\[client %{IPORHOST:source.address}(:%{POSINT:source.port})?\\])? %{GREEDYDATA:message}",
"\\[%{APACHE_TIME:apache.error.timestamp}\\] \\[%{DATA:apache.error.module}:%{LOGLEVEL:log.level}\\] \\[pid %{NUMBER:process.pid:long}(:tid %{NUMBER:process.thread.id:long})?\\]( \\[client %{IPORHOST:source.address}(:%{POSINT:source.port})?\\])? %{GREEDYDATA:message}"
],
"pattern_definitions" : {
"APACHE_TIME" : "%{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{YEAR}"
}
}
},
{
"date" : {
"field" : "apache.error.timestamp",
"target_field" : "@timestamp",
"formats" : [
"EEE MMM dd H:m:s yyyy",
"EEE MMM dd H:m:s.SSSSSS yyyy"
],
"on_failure" : [
{
"append" : {
"field" : "error.message",
"value" : "{{ _ingest.on_failure_message }}"
}
}
],
"if" : "ctx.event.timezone == null"
}
},
{
"date" : {
"on_failure" : [
{
"append" : {
"field" : "error.message",
"value" : "{{ _ingest.on_failure_message }}"
}
}
],
"if" : "ctx.event.timezone != null",
"field" : "apache.error.timestamp",
"target_field" : "@timestamp",
"formats" : [
"EEE MMM dd H:m:s yyyy",
"EEE MMM dd H:m:s.SSSSSS yyyy"
],
"timezone" : "{{ event.timezone }}"
}
},
{
"remove" : {
"ignore_failure" : true,
"field" : "apache.error.timestamp"
}
},
{
"set" : {
"field" : "event.kind",
"value" : "event"
}
},
{
"set" : {
"field" : "event.category",
"value" : "web"
}
},
{
"script" : {
"source" : """def err_levels = ["emerg", "alert", "crit", "error", "warn"]; if (err_levels.contains(ctx.log.level)) {
ctx.event.type = "error";
} else {
ctx.event.type = "info";
}""",
"if" : "ctx?.log?.level != null",
"lang" : "painless"
}
},
{
"grok" : {
"patterns" : [
"^(%{IP:source.ip}|%{HOSTNAME:source.domain})$"
],
"field" : "source.address",
"ignore_missing" : true
}
},
{
"geoip" : {
"target_field" : "source.geo",
"ignore_missing" : true,
"field" : "source.ip"
}
},
{
"geoip" : {
"properties" : [
"asn",
"organization_name"
],
"ignore_missing" : true,
"database_file" : "GeoLite2-ASN.mmdb",
"field" : "source.ip",
"target_field" : "source.as"
}
},
{
"rename" : {
"target_field" : "source.as.number",
"ignore_missing" : true,
"field" : "source.as.asn"
}
},
{
"rename" : {
"field" : "source.as.organization_name",
"target_field" : "source.as.organization.name",
"ignore_missing" : true
}
}
],
"on_failure" : [
{
"set" : {
"field" : "error.message",
"value" : "{{ _ingest.on_failure_message }}"
}
}
]
}
}