How to determine when dissect fails and branch to try a different dissect clause?

When a dissect clause pattern fails to match the message in hand, it would be nice to try another. This arises in the case where the distinction between log entries cannot be discriminated via any available condition.

I have tried a) multiple dissect clauses (one after the other), b) separate mapping clauses (ibid), and c) separate message patterns too unsuccessfully. I have also tried d) something called "_dissectfailure".

I saw the existence somewhere of "_dissectfailure" in tags, but there is no tags field (according to https://www.elastic.co/guide/en/beats/filebeat/current/exported-fields-log.html) and I could not make it work:

if "_dissectfailure" in [tags]
  ...

How can this be solved?

1 Like

Hi Russell,

The tags field will exist and be added by the Logstash pipeline. What you are trying to do is perfectly correct and should work. What does your Logstash config look like? Are you nesting the next dissect in the if else statements?

Have you set the tag_on_failure to some value other than default?
https://www.elastic.co/guide/en/logstash/current/plugins-filters-dissect.html#plugins-filters-dissect-tag_on_failure

I've built it atop the popular sebp/elk container adding only my own filter code. (Are the elements/files/definitions of the pipeline executed serially in order?) Here's what I have:

02-beats-input.conf (from sebp/elk):
--just input configuration.

08-cef.conf:
--my own later atop sebp/elk, where I want to use "_dissectfailure". I've written a couple of Common Error Format (CEF) pattern matchers using dissect plus several other pattern matchers for other types of log entries (also using dissect). All these work--I've already tested them individually, but I'm struggling to get them all into the code at once. What I've got here seems to work so far, but I need to add half a dozen more of these separate dissect constructs to the case where [source] =~ "debug" tried one after another to handle different debug log statements, hence my question:

filter
{
  if [source] =~ "debug"
  {
    # send debug.log entries through here:
    dissect
    {
      mapping =>
      {
        "message" => "[%{ignore}] %{acme.date} %{acme.time} - REST: Path: %{acme.restpath}"
      }
      # Now that the new, split-out fields are created, we don't need to keep
      # 'message' any longer:
      remove_field => [ "message" ]
    }
  }
  else
  {
    # send audit.log (purely CEF) entries through here:
    dissect
    {
      mapping =>
      {
        "message" => "%{acme.date} %{acme.time} CEF:%{acme.version}|%{acme.device_vendor}|%{acme.device_product}|%{acme.device_version}|%{acme.device_event_class_id}|%{acme.name}|%{acmeda.severity}|%{acme.extensions}"
      }
      # Now that the new, split-out fields are created, we don't need to keep
      # 'message' any longer:
      remove_field => [ "message" ]
    }

    # Parse 'acme.extensions' for its key-value pairs and put the keys out as
    # separate fields (with values). Values parsed out are impoverished beyond
    # the first space they contain without 'field_split_pattern' and
    # 'whitespace' below. The results are prefixed for easy recognition.
    kv
    {
      source              => "acme.extensions"
      field_split_pattern => " (?=[A-Za-z0-9]+=)"
      whitespace          => "strict"
      prefix              => "acme."
    }
  }
}

09-syslog.conf (from sebp/elk):

filter
{
  if [type] == "syslog"
  {
    grok
   {
      match =>
      {
        "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}"
      }
      add_field => [ "received_at", "%{@timestamp}" ]
      add_field => [ "received_from", "%{host}" ]
    }
    syslog_pri { }
    date
    {
      match => [ "syslog_timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
    }
  }
}

11-nginx.conf (from sebp/elk):

filter
{
  if [type] == "nginx-access"
  {
    grok
    {
      match => { "message" => "%{NGINXACCESS}" }
    }
  }
}

30-output.conf (from sebp/elk):
--just output configuration

tag_on_failure: I don't touch this, was unaware of its existence and don't know what to do with it nor where to do it.

Nesting of additional dissect paragraphs: I have tried this nesting:

filter
{
  dissect
  {
    ...
  }
  if "_dissectfailure" not in [ tags ]
  {
    dissect
    {
      ...
    }
  }
}

What makes this hard is that adding to filter code, which then breaks and produces nothing, it's so hard to determine what aspect of what's been added has broken a working filter. Maybe there are more sophisticated approaches to filter development, but I don't know them yet.

Thank you for responding!

Hi Russell,

Yes, you are right, they are combined as a single Logstash pipeline. Logstash follows the UNIX way for merging the files and creating a single channel. The issue has been discussed here:

If you have multiple inputs, it would make sense to probably use the Multiple Pipeline feature in Logstash.

Your issue is mostly with how logstash merges your files. Could you probably follow a pattern similar to this:

### Input Section ###
10_input_beats.conf
10_input_syslog.conf
### End ###

### Filter Section ###
10_filter_syslog.conf
20_filter_syslog.conf
10_filter_beats.conf
### End ###

### Output Section ###
10_output.conf
### End ### 

This will ensure your filters are applied in the correct order and they work.

Thanks, Nachiket. The pipelining set up by sebp/elk was what I was respecting as you see by my post and I somewhat naively followed his lead by integrating my numeric filename prefixes in just as he had done. This seems to work fine. What doesn't work is the nesting of additional dissect paragraphs and error-handling. I wrote a new post on this: Use of keyword 'else' causes error "Couldn't find any filter plugin named 'else'.".

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.