Question about s3 output plugin

Is it normal for the files logstash puts to s3 to be the raw log file, no parsing? It would seem that the s3 output discards any work done by the filters.

    [root@sysmanage logstash-docker]# cat pipeline/logstash.conf 
    input {
      tcp {
        port => '5140'
      }
      udp {
        port => '5140'
      }
    }

    filter {
      if [type] == "syslog" {
        #change to pfSense ip address
        if [host] == ["XXXXXXXXXXXX"] {
          mutate {
            add_tag => ["PFSense", "Ready"]
          }
        }
        if "Ready" not in [tags] {
          mutate {
            add_tag => [ "syslog" ]
          }
        }
      }
    }
    filter {
      if [type] == "syslog" {
        mutate {
          remove_tag => "Ready"
        }
      }
    }
    filter {
      if "syslog" in [tags] {
        grok {
          match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
          add_field => [ "received_at", "%{@timestamp}" ]
          add_field => [ "received_from", "%{host}" ]
        }
        syslog_pri { }
        date {
          match => [ "syslog_timestamp", "MMM  d HH:mm:ss", "MMM  dd HH:mm:ss" ]
          locale => "en"
        }
        if !("_grokparsefailure" in [tags]) {
          mutate {
            replace => [ "@source_host", "%{syslog_hostname}" ]
            replace => [ "@message", "%{syslog_message}" ]
          }
        }
        mutate {
          remove_field => [ "syslog_hostname", "syslog_message", "syslog_timestamp" ]
        }
    #    if "_grokparsefailure" in [tags] {
    #      drop { }
    #    }
      }
    }

    filter {
      if "PFSense" in [tags] {
        grok {
          add_tag => [ "firewall" ]
          match => [ "message", "<(?<evtid>.*)>(?<datetime>(?:Jan(?:uary)?|Feb(?:ruary)?|Mar(?:ch)?|Apr(?:il)?|May|Jun(?:e)?|Jul(?:y)?|Aug(?:ust)?|Sep(?:tember)?|Oct(?:ober)?|Nov(?:ember)?|Dec(?:ember)?)\s+(?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9]) (?:2[0123]|[01]?[0-9]):(?:[0-5][0-9]):(?:[0-5][0-9])) (?<prog>.*?): (?<msg>.*)" ]
        }
        mutate {
          gsub => ["datetime","  "," "]
        }
        date {
          match => [ "datetime", "MMM dd HH:mm:ss" ]
          timezone => "America/Los_Angeles"
        }
        mutate {
          replace => [ "message", "%{msg}" ]
        }
        mutate {
          remove_field => [ "msg", "datetime" ]
        }
    }
    if [prog] =~ /^filterlog$/ {
        mutate {
          remove_field => [ "msg", "datetime" ]
        }
        grok {
          patterns_dir => "/usr/share/logstash/pipeline/patterns"
          match => [ "message", "%{PFSENSE_LOG_DATA}%{PFSENSE_IP_SPECIFIC_DATA}%{PFSENSE_IP_DATA}%{PFSENSE_PROTOCOL_DATA}",
                     "message", "%{PFSENSE_LOG_DATA}%{PFSENSE_IPv4_SPECIFIC_DATA_ECN}%{PFSENSE_IP_DATA}%{PFSENSE_PROTOCOL_DATA}",
                     "message", "%{PFSENSE_LOG_DATA}%{PFSENSE_IPv6_SPECIFIC_DATA}"]
        }
        mutate {
          lowercase => [ 'proto' ]
        }
        geoip {
          add_tag => [ "GeoIP" ]
          source => "src_ip"
          add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
          add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ]
          # Optional GeoIP database
          # Comment out the below if you do not wise to utilize and omit last three steps dealing with (recommended) suffix
          database => "/usr/share/logstash/GeoLite2-City.mmdb"
        }
        mutate {
          convert => [ "[geoip][coordinates]", "float"]
        }
      }
    }


    output {
      s3 {
        access_key_id => 'XXXXXXXXXXXXXXXXXXXX'
        secret_access_key => 'XXXXXXXXXXXXXXXXXXXXXX'
        region => 'us-east-1'
        bucket => 'XXXXXXXXXXXXXXX'
        time_file => 5
        canned_acl => 'private'
        #codec => 'plain' 
        #codec => json_lines 
        #temporary_directory => '/tmp/log-data'
      }
    }

Example s3 output:
2018-03-07T08:16:28.595Z XXXXXXXXXX <134>Mar 7 08:16:28 filterlog: 5,,,1000000103,em0,match,block,in,4,0x0,,2,5005,0,DF,17,udp,428,$srcip,$dstip,44441,1900,408

That's what you get with the plain codec; the timestamp, hostname, and contents of the message field. What would you like to get instead?

Thanks for replying Magnus, I guess I'm looking for a json-like output of the information parsed with grok. I don't currently have my ELK up for an example, but I was hoping for something more than just plain. Are there other codecs I should look at? the json_lines looks like:
{"@version":"1","host":"xxxxxxx","@timestamp":"2018-03-16T14:16:46.063Z","message":"<134>Mar 16 14:16:46 filterlog: 97,,,1519707990,em2,match,pass,in,4,0xb8,,63,0,0,DF,17,udp,76,srcip,dstip,123,123,56"}
Which also doesn't really help me much,
Thanks for your time

The json_lines codec dumps the whole event as JSON. If you're not happy with the results then your filters need some love.

AFAICT you're not applying any filters at all since none of the conditions are true. type is never set to "syslog".

1 Like

I litterally just noticed that, and starting rewriting my config, ill write back after a bit of testing :frowning:

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.