Conditional grok parsing using elasticsearch processor pipeline

We use the default apache error elasticsearch pipeline but, we have the requirement that needs a further parsing. We need an additional grok parsing for message field.

How to perform grok parsing when field event_type is "error"?
This is the logstash equivalent

if "error" in [event][type] {
	grok {
            break_on_match => true
            match => { 
				"message" => [
					"%{DATA:ApacheErrorRefId}: %{GREEDYDATA:ErrorDescription} %{DATA:httpmethod} %{GREEDYDATA:request}(?: HTTP/%{NUMBER:httpversion})",
					"%{DATA:ApacheErrorRefId}: %{GREEDYDATA:ErrorDescription}: %{GREEDYDATA:filepath}"
				]
			}
        }
}

I am not familiar with elasticsearch processor so i dont have that kind of idea how to start.

{
  "filebeat-7.8.0-apache-error-pipeline" : {
    "description" : "Pipeline for parsing apache error logs",
    "processors" : [
      {
        "grok" : {
          "ignore_missing" : true,
          "field" : "message",
          "patterns" : [
            "\\[%{APACHE_TIME:apache.error.timestamp}\\] \\[%{LOGLEVEL:log.level}\\]( \\[client %{IPORHOST:source.address}(:%{POSINT:source.port})?\\])? %{GREEDYDATA:message}",
            "\\[%{APACHE_TIME:apache.error.timestamp}\\] \\[%{DATA:apache.error.module}:%{LOGLEVEL:log.level}\\] \\[pid %{NUMBER:process.pid:long}(:tid %{NUMBER:process.thread.id:long})?\\]( \\[client %{IPORHOST:source.address}(:%{POSINT:source.port})?\\])? %{GREEDYDATA:message}"
          ],
          "pattern_definitions" : {
            "APACHE_TIME" : "%{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{YEAR}"
          }
        }
      },
      {
        "date" : {
          "field" : "apache.error.timestamp",
          "target_field" : "@timestamp",
          "formats" : [
            "EEE MMM dd H:m:s yyyy",
            "EEE MMM dd H:m:s.SSSSSS yyyy"
          ],
          "on_failure" : [
            {
              "append" : {
                "field" : "error.message",
                "value" : "{{ _ingest.on_failure_message }}"
              }
            }
          ],
          "if" : "ctx.event.timezone == null"
        }
      },
      {
        "date" : {
          "on_failure" : [
            {
              "append" : {
                "field" : "error.message",
                "value" : "{{ _ingest.on_failure_message }}"
              }
            }
          ],
          "if" : "ctx.event.timezone != null",
          "field" : "apache.error.timestamp",
          "target_field" : "@timestamp",
          "formats" : [
            "EEE MMM dd H:m:s yyyy",
            "EEE MMM dd H:m:s.SSSSSS yyyy"
          ],
          "timezone" : "{{ event.timezone }}"
        }
      },
      {
        "remove" : {
          "ignore_failure" : true,
          "field" : "apache.error.timestamp"
        }
      },
      {
        "set" : {
          "field" : "event.kind",
          "value" : "event"
        }
      },
      {
        "set" : {
          "field" : "event.category",
          "value" : "web"
        }
      },
      {
        "script" : {
          "source" : """def err_levels = ["emerg", "alert", "crit", "error", "warn"]; if (err_levels.contains(ctx.log.level)) {
  ctx.event.type = "error";
} else {
  ctx.event.type = "info";
}""",
          "if" : "ctx?.log?.level != null",
          "lang" : "painless"
        }
      },
      {
        "grok" : {
          "patterns" : [
            "^(%{IP:source.ip}|%{HOSTNAME:source.domain})$"
          ],
          "field" : "source.address",
          "ignore_missing" : true
        }
      },
      {
        "geoip" : {
          "target_field" : "source.geo",
          "ignore_missing" : true,
          "field" : "source.ip"
        }
      },
      {
        "geoip" : {
          "properties" : [
            "asn",
            "organization_name"
          ],
          "ignore_missing" : true,
          "database_file" : "GeoLite2-ASN.mmdb",
          "field" : "source.ip",
          "target_field" : "source.as"
        }
      },
      {
        "rename" : {
          "target_field" : "source.as.number",
          "ignore_missing" : true,
          "field" : "source.as.asn"
        }
      },
      {
        "rename" : {
          "field" : "source.as.organization_name",
          "target_field" : "source.as.organization.name",
          "ignore_missing" : true
        }
      }
    ],
    "on_failure" : [
      {
        "set" : {
          "field" : "error.message",
          "value" : "{{ _ingest.on_failure_message }}"
        }
      }
    ]
  }
}

you can use an if in the grok processor, see this example

POST _ingest/pipeline/_simulate
{
  "docs": [
    {
      "_source": {
        "type": "event"
      }
    },
    {
      "_source": {
        "type": "another_event"
      }
    }
  ],
  "pipeline": {
    "processors": [
      {
        "set": {
          "field": "key",
          "value": "value",
          "if": "ctx.type == 'event'"
        }
      }
    ]
  }
}
1 Like

Thank you!

Follow up question, how do you put contains query in "if"? say.. if field x contains ":" ...

if that field is an array, just try ctx.types.contains("event")