Logstash - grokfilter is successful using grokdebug, but fails in live

I have a grok filter that I have built and tested on http://grokdebug.herokuapp.com.

the test sample is:

<190>3259426: *Dec 7 17:43:09.599 UTC: %SEC-6-IPACCESSLOGP: list ACL_3P denied tcp 10.10.10.2(8000) -> 10.20.20.2(59752), 1 packet

the filter is:

${CISCOIOSLOG}
#patterns
CISCOIOSTIMESTAMP %{MONTH} +%{MONTHDAY} %{TIME}( %{TZ})?
CISCOIOSLOG %{WORD:log_facility}-%{INT:log_severity}-%{WORD:log_mnemonic}:
%{NUMBER:log_sequence_no}: \*%{CISCOIOSTIMESTAMP:acl_timestamp}: \%%{CISCOIOSLOG} list %{DATA:ACL_name} %{WORD:ACL_action} %{WORD:proto} %{IP:src_ip}\(%{INT:src_port}\) -> %{IP:dst_ip}\(%{INT:dst_port}\)\, %{INT:packets} packet
CISCOINTERFACE %{CISCO_REASON:facility}.%{LOGLEVEL:log_level}: %{NUMBER:log_sequence_no}: \*%{CISCOIOSTIMESTAMP:acl_timestamp}: \%%{CISCOIOSLOG} Line protocol on Interface %{INTERFACE:interface}, changed state to %{WORD:state}

and this works, I have added the patterns to a file in "/opt/logstash/patterns" and then my Logstash conf is below:

#
# INPUT - Logstash listens on port 5000 for these logs.
#

input {
  udp {
    port => 1514
    type => "cisco-syslog"

  }
  
  tcp {
    port => 1514
    type => "cisco-syslog"
  }
}
filter {
  # NOTE: The frontend logstash servers set the type of incoming messages.
  if [type] == "cisco-syslog" {
    # The switches are sending the same message to all syslog servers for redundancy, this allows us to
    ## only store the message in elasticsearch once by generating a hash of the message and using that as
    ## the document_id.
    fingerprint {
      source              => [ "message" ]
      method              => "SHA1"
      key                 => "839a-3SD2-498d-314f-l18H"
      concatenate_sources => true
    }
    # Parse the log entry into sections.  Cisco doesn't use a consistent log format, unfortunately.
    grok {
      # There are a couple of custom patterns associated with this filter.
      patterns_dir => [ "/opt/logstash/patterns" ]
      match => [
        # IOS ACL
        "message", "%{CISCOIOSACL}",
        # IOS
        "message", "%{CISCOINTERFACE}",
        "message", "%{CISCO_DEFAULT}",
        # IOS
        "message", "%{SYSLOG5424PRI}(%{NUMBER:log_sequence#})?:( %{NUMBER}:)? %{CISCOTIMESTAMPTZ:log_date}: %%{CISCO_REASON:facility}-%{INT:severity_level}-%{CISCO_REASON:facility_mnemonic}: %{GREEDYDATA:message}",
        "message", "%{SYSLOG5424PRI}(%{NUMBER:log_sequence#})?:( %{NUMBER}:)? %{CISCOTIMESTAMPTZ:log_date}: %%{CISCO_REASON:facility}-%{CISCO_REASON:facility_sub}-%{INT:severity_level}-%{CISCO_REASON:facility_mnemonic}: %{GREEDYDATA:message}",
        # Nexus
        "message", "%{SYSLOG5424PRI}(%{NUMBER:log_sequence#})?: %{NEXUSTIMESTAMP:log_date}: %%{CISCO_REASON:facility}-%{INT:severity_level}-%{CISCO_REASON:facility_mnemonic}: %{GREEDYDATA:message}",
        "message", "%{SYSLOG5424PRI}(%{NUMBER:log_sequence#})?: %{NEXUSTIMESTAMP:log_date}: %%{CISCO_REASON:facility}-%{CISCO_REASON:facility_sub}-%{INT:severity_level}-%{CISCO_REASON:facility_mnemonic}: %{GREEDYDATA:message}"
      ]
      overwrite => [ "message" ]
      add_tag => [ "cisco" ]
      remove_field => [ "syslog5424_pri", "@version" ]
    }
  }
  # If we made it here, the grok was sucessful
  if "cisco" in [tags] {
    date {
      match => [
        "log_date",
        # IOS
        "MMM dd HH:mm:ss.SSS ZZZ",
        "MMM dd HH:mm:ss ZZZ",
        "MMM dd HH:mm:ss.SSS",
        # Nexus
        "YYYY MMM dd HH:mm:ss.SSS ZZZ",
        "YYYY MMM dd HH:mm:ss ZZZ",
        "YYYY MMM dd HH:mm:ss.SSS",
        # Hail marry
        "ISO8601"
      ]
    }
    # Add the log level's name instead of just a number.
    mutate {
      gsub => [
        "severity_level", "0", "0 - Emergency",
        "severity_level", "1", "1 - Alert",
        "severity_level", "2", "2 - Critical",
        "severity_level", "3", "3 - Error",
        "severity_level", "4", "4 - Warning",
        "severity_level", "5", "5 - Notification",
        "severity_level", "6", "6 - Informational"
      ]
    }
    # Translate the short facility name into a full name.
    # NOTE:  This is a third party plugin: logstash-filter-translate
    translate {
      field       => "facility"
      destination => "facility_full"
      dictionary => [
        "AAA", "Authentication, authorization, and accounting",
        "AAA_CACHE", "Authentication, authorization, and accounting cache",
        ... output ommited
      ]
    } # translate
  } # if
} # filter

output {
  # Something went wrong with the grok parsing, don't discard the messages though
  if "_grokparsefailure" in [tags] {
    file {
      path => "/tmp/fail-%{type}-%{+YYYY.MM.dd}.log"
    }
  }
  # The message was parsed correctly, and should be sent to elasicsearch.
  if "cisco" in [tags] {
    #file {
    #  path => "/tmp/%{type}-%{+YYYY.MM.dd}.log"
    #}
    elasticsearch {
      hosts           => "elasticsearch:9200"
      manage_template => false
      index           => "network-%{+YYYY.MM.dd}"
      document_type   => "%{type}"
      document_id     => "%{fingerprint}"
    }
  }
}

all the logs are just ending up in /tmp/fail-cisco-syslog-2017.12.07.log and are not being processed.
sample from the logs:

{"@timestamp":"2017-12-07T17:45:57.208Z","@version":"1","host":"192.168.44.1","fingerprint":"03871ef40c9a56e019a09fd18194dd71762df96b","message":"<190>3259426: *Dec 7 17:43:09.599 UTC: %SEC-6-IPACCESSLOGP: list ACL_3P denied tcp 10.10.10.2(8000) -> 10.10.20.2(59752), 1 packet ","type":"cisco-syslog","tags":["_grokparsefailure"]}

It's not clear to me which grok pattern you expect will match the input string.

The standard advice in these cases is to start as simple as possible and build the grok expression from there, adding more and more complexity.

I assumed that in the grok match statement that, it would try match any of the patterns, and once it had a match it would process the log? is this not correct? im new to Logstash so I may have misunderstood the match process

The grok expressions will be tried one by one, in the order specified in the configuration. Once an expression matches the remaining ones will be skipped.

thats what I has assumed, the sample log I have posted should match the first statement "message", "%{CISCOIOSACL}", which it does using http://grokdebug.herokuapp.com however when I run it in my logtash container everything is dumped to the error file.

How can I troubleshoot this?
Thanks

And how is CISCOIOSACL defined? I couldn't find it among the standard grok patterns.

its a custom pattern, added to opts/logstash/patterns, ive put the pattern in the question above.

if I have more than one custom pattern file, will they all load?

its a custom pattern, added to opts/logstash/patterns, ive put the pattern in the question above.

You've included definitions of CISCOIOSTIMESTAMP, CISCOIOSLOG, and CISCOINTERFACE. Not CISCOIOSACL.

if I have more than one custom pattern file, will they all load?

Yes.

apologies, I missed that off, all my patterns now look like this

CISCOIOSTIMESTAMP %{MONTH} +%{MONTHDAY} %{TIME}( %{TZ})?
CISCOIOSLOG %{WORD:facility}-%{INT:severity_level}-%{WORD:log_mnemonic}:
INTERFACE %{WORD}/%{INT}/%{INT}

CISCOIOSACL %{NUMBER:log_sequence_no}: \*%{CISCOIOSTIMESTAMP:log_date}: \%%{CISCOIOSLOG} list %{DATA:ACL_name} %{WORD:ACL_action} %{WORD:proto} %{IP:src_ip}\(%{INT:src_port}\) -> %{IP:dst_ip}\(%{INT:dst_port}\)\, %{INT:packets} packet
CISCOINTERFACE %{NUMBER:log_sequence_no}: \*%{CISCOIOSTIMESTAMP:log_date}: \%%{CISCOIOSLOG} Line protocol on Interface %{INTERFACE:interface}, changed state to %{WORD:state}
CISCO_DEFAULT %{NUMBER:log_sequence_no}: \*%{CISCOIOSTIMESTAMP:log_date}: \%%{CISCOIOSLOG}%{GREEDYDATA:message}

also I think I found most of the issues, further down in the config, I was referencing log_date and serverity_level, which are not set in CISCOIOSACL, they are now set and I am getting a date_parse error instead.

log file:
{"log_sequence_no":"3268364","message":"<190>3268364: *Dec 8 14:37:02.829 UTC: %SEC-6-IPACCESSLOGP: list ACL_3P-OUT denied tcp 10.10.10.2(8000) -> 10.10.20.2(64680), 1 packet ","type":"cisco-syslog","severity_level":"6 - Informational","ACL_name":"ACL_3P","ACL_action":"denied","facility_full":"IP security","dst_ip":"10.10.20.2","packets":"1","tags":["cisco","_dateparsefailure"],"src_ip":"10.10.10.2","src_port":"8000","@timestamp":"2017-12-08T14:39:48.885Z","log_mnemonic":"IPACCESSLOGP","log_date":"Dec 8 14:37:02.829 UTC","proto":"tcp","@version":"1","host":"192.168.44.1","fingerprint":"782937691dae719bea2e34ca3fa91de2a495f043","dst_port":"64680","facility":"SEC"}

I guessed this section has the error? I looked and it seemed after Dec that it had a double space so I added that, no luck
if "cisco" in [tags] {
date {
match => [
"log_date",
# IOS
"MMM dd HH:mm:ss.SSS ZZZ",
"MMM dd HH:mm:ss.SSS ZZZ",
"MMM dd HH:mm:ss ZZZ",
"MMM dd HH:mm:ss.SSS",
# Nexus
"YYYY MMM dd HH:mm:ss.SSS ZZZ",
"YYYY MMM dd HH:mm:ss ZZZ",
"YYYY MMM dd HH:mm:ss.SSS",
# Hail marry
"ISO8601"
]
}

To match Dec 8 I believe you need MMM d.

that may have done it, the /tmp/log files are no longer being created, however I am not seeing anything happen.

im setting up ELK as per this guide http://www.mattkimber.co.uk/elk-stack-in-25-minutes-with-docker/
so ive loaded kibana now and I need to create an index pattern.

the guide states that with a correct filter Logstash should be sending data to elastic search and then I should be able to create an index. however each time I refresh the page I still cannot create an index. nor do I see logs to say things are being processed by elastic search...

how do I troubleshoot this section?
Thanks

the guide states that with a correct filter Logstash should be sending data to Elasticsearch and then I should be able to create an index.

Do you mean index pattern? In Kibana?

You can use Elasticsearch's REST API to check which indexes exist. Knowing that would be a good start.

so the below would suggest I have created an index as per the Logstash config? is the index network-2017.12.08? I would put that in kibana?

alexw-mbp:itapp alexw$ curl -XGET 'localhost:9200/_cat/indices?v&pretty'
health status index                             uuid                   pri rep docs.count docs.deleted store.size pri.store.size
yellow open   .monitoring-logstash-6-2017.12.08 ZNgILIqwRDeEhAd77nFNZw   1   1        364            0    226.2kb        226.2kb
yellow open   .monitoring-es-6-2017.12.08       Uq02k6g9SW-mZGG0PnwC7w   1   1        364           36    281.5kb        281.5kb
yellow open   .kibana                           2ccgsW32Tf6ciKu2x4Oaig   1   1          1            0      3.6kb          3.6kb
yellow open   .monitoring-kibana-6-2017.12.08   sT92QnCjSuSm8bHQsF8qJA   1   1         38            0      145kb          145kb
yellow open   network-2017.12.08                O_J7DSD1TDGUYQpQwRFVIg   5   1         27            0    378.6kb        378.6kb

I did that and it works!!! finally :slight_smile:

one thing, I get warnings that document_type => "%{type}" is deprecated, what is this replaced with? I cant seem to find the answer in google

Thanks for your help!

one thing, I get warnings that document_type => "%{type}" is deprecated, what is this replaced with?

See Removal of mapping types | Elasticsearch Guide [8.11] | Elastic. Instead of using the document type to signal the type of the event you can use another field for that. Or, keep using type for that but set the document_type option to a constant value.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.