Logstash config for parsing ASA logs

So I am brand new to ELK/Elastic so I'm sure I'm doing at least a few things wrong. I was able to find a couple examples of Logstash config files for parsing ASA logs. They are similar to what I've seen on this forum. So far it appears to be parsing the logs that it's supposed to with no issues.

I am getting _grokparsefailure on some logs that I do not care about and would actually prefer if they were just dropped. It was my understanding that if grok was unable to parse the logs they would be dropped and not end up in the "output section". Is there an easy way to drop these logs without forwarding them on to elasticsearch?

The other question I have is around using logic in the filter section. There are around 10 ASA's that I am going to be taking in logs from. It seemed easiest if I had each one send logs over a different port so I could add a field in the logs to indicate which ASA sent the logs. The way I have my config file right now works fine for that, but I don't really like that the firewall name is going to the "type" field. I made several attempts to use an if statement to try to add a "device" field with the corresponding firewall name based on the port the log was received on. It seems like I am never actually getting the actual value of port. That is why I added the else statement and attempted to insert the value of port into a field. No matter what I did the config file either failed validation or just put the word "port" instead of the numeric value.
I am able to successfully get the value of type by using:

add_field => ["EXTRA TYPE", "%{type}"]

The same format did not work for port though.

    input {
      # Receive Cisco ASA logs on the standard syslog UDP port 514
        udp {
            port => 21002
            #type => "cisco-asa"
            type => "ASA Firewall 2"
        }
        udp {
            port => 21008
            #type => "cisco-asa"
            type => "ASA Firewall 8"
        }
    	#tcp {
        #    port => 21002
        #    type => "cisco-asa"
        #}
        #tcp {
        #    port => 21008
        #    type => "cisco-asa"
        #}
    }

    filter {
        # Split the syslog part and Cisco tag out of the message
        grok {
            match => ["message", "%{CISCO_TAGGED_SYSLOG} %{GREEDYDATA:cisco_message}"]
        }

        # Parse the syslog severity and facility
        syslog_pri { }
        # Parse the date from the "timestamp" field to the "@timestamp" field
        date {
            match => ["timestamp",
                "MMM dd HH:mm:ss",
                "MMM  d HH:mm:ss",
                "MMM dd yyyy HH:mm:ss",
                "MMM  d yyyy HH:mm:ss"
            ]
            timezone => "America/Chicago"
        }

        # Clean up redundant fields if parsing was successful
        if "_grokparsefailure" not in [tags] {
            mutate {
                rename => ["cisco_message", "message"]
                remove_field => ["timestamp"]
            }
        }

        #if [%{port}] == "21002" {
        #    mutate {
        #        add_field => ["device", "ASA Firewall 2"]
        #    }
        #}
        #else if "port" == "21008" {
        #    mutate {
        #        add_field => ["device", "ASA Firewall 8"]
        #    }
        #}

        #else{
        #    mutate {
        #        add_field => ["device","THIS WAS AN ELSE"]
        #        #add_field => ["JUUUUUUNKKKK", {port}] - bad syntax 
        #        #add_field => ["JUUUUUUNKKKK", "{port}"] - "JUUUUUUNKKKK" => "{port}",
        #        #add_field => ["JUUUUUUNKKKK", port] - "JUUUUUUNKKKK" => "port",
        #        add_field => ["EXTRA TYPE", "%{type}"]
        #        add_field => ["JUUUUUUNKKKK", %{port}] - bad syntax
        #        add_field => ["JUUUUUUNKKKK", "%{port}"]
        #
        #
        #    }
        #}    
        # Extract fields from the each of the detailed message types
        # The patterns provided below are included in Logstash since 1.2.0
        grok {
            match => [
                "message", "%{CISCOFW106001}",
                "message", "%{CISCOFW106006_106007_106010}",
                "message", "%{CISCOFW106014}",
                "message", "%{CISCOFW106015}",
                "message", "%{CISCOFW106021}",
                "message", "%{CISCOFW106023}",
                "message", "%{CISCOFW106100}",
                "message", "%{CISCOFW110002}",
                "message", "%{CISCOFW302010}",
                "message", "%{CISCOFW302013_302014_302015_302016}",
                "message", "%{CISCOFW302020_302021}",
                "message", "%{CISCOFW305011}",
                "message", "%{CISCOFW313001_313004_313008}",
                "message", "%{CISCOFW313005}",
                "message", "%{CISCOFW402117}",
                "message", "%{CISCOFW402119}",
                "message", "%{CISCOFW419001}",
                "message", "%{CISCOFW419002}",
                "message", "%{CISCOFW500004}",
                "message", "%{CISCOFW602303_602304}",
                "message", "%{CISCOFW710001_710002_710003_710005_710006}",
                "message", "%{CISCOFW713172}",
                "message", "%{CISCOFW733100}"
            ]
        }
    }


    output {
        # Archive Cisco ASA firewall logs on disk based on the event's timestamp
        # Results in directories for each year and month, with conveniently-named log files, like:
        # /path/to/archive/cisco-asa/2014/2014-09/cisco-asa-2014-09-24.log
        
    	stdout { codec => rubydebug }
    	
    	#file {
        #    path => "/opt/logstash_logs/%{type}/%{+YYYY}/%{+YYYY-MM}/%{type}-%{+YYYY-MM-dd}.log"
        #}

        # Also output to ElasticSearch for review in Kibana
        elasticsearch { 
            hosts => ["localhost:9200"] 
        }
    }
if "_grokparsefailure" in [tags] { drop {} }

will drop events that you are unable to parse.

The tcp input will set host, port, and [@metadata][ip_address] fields on each event that it flushes to the pipeline. Instead of configuring multiple input you could use these to identify the client. Assuming you did not set the dns_reverse_lookup_enabled option then host will be a hostname (assuming the DNS lookup succeeded).

Dropping the logs worked perfectly!
So a word of warning to anyone that comes across this and considers changing Cisco ASA's to use TCP instead of UDP for syslog since I caused a major outage when trying to switch to TCP:

I had TCP in there just for testing log ingestion through telnet, but was planning on using UDP until I thought I get get more information about the client sending the logs by using TCP. After the outage was resolved I ended up switching back to UDP. I had all the ASA's use UDP port 514 and I used IP tables to re-reroute to 5514 so I only needed one Listener:

    udp {
        port => 5514
        type => "cisco-asa"
    }

I was able to identify the ASA by using [host]:

    if "x.x.x.1" in [host] {
        mutate {
            add_field => ["device","firewall 1"]
        }
    }
    else if "x.x.x.2" in [host] {
        mutate {
            add_field => ["device", "Firewall 2"]
        }
    }

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.