Is it normal for the files logstash puts to s3 to be the raw log file, no parsing? It would seem that the s3 output discards any work done by the filters.
[root@sysmanage logstash-docker]# cat pipeline/logstash.conf
input {
tcp {
port => '5140'
}
udp {
port => '5140'
}
}
filter {
if [type] == "syslog" {
#change to pfSense ip address
if [host] == ["XXXXXXXXXXXX"] {
mutate {
add_tag => ["PFSense", "Ready"]
}
}
if "Ready" not in [tags] {
mutate {
add_tag => [ "syslog" ]
}
}
}
}
filter {
if [type] == "syslog" {
mutate {
remove_tag => "Ready"
}
}
}
filter {
if "syslog" in [tags] {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
add_field => [ "received_at", "%{@timestamp}" ]
add_field => [ "received_from", "%{host}" ]
}
syslog_pri { }
date {
match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
locale => "en"
}
if !("_grokparsefailure" in [tags]) {
mutate {
replace => [ "@source_host", "%{syslog_hostname}" ]
replace => [ "@message", "%{syslog_message}" ]
}
}
mutate {
remove_field => [ "syslog_hostname", "syslog_message", "syslog_timestamp" ]
}
# if "_grokparsefailure" in [tags] {
# drop { }
# }
}
}
filter {
if "PFSense" in [tags] {
grok {
add_tag => [ "firewall" ]
match => [ "message", "<(?<evtid>.*)>(?<datetime>(?:Jan(?:uary)?|Feb(?:ruary)?|Mar(?:ch)?|Apr(?:il)?|May|Jun(?:e)?|Jul(?:y)?|Aug(?:ust)?|Sep(?:tember)?|Oct(?:ober)?|Nov(?:ember)?|Dec(?:ember)?)\s+(?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9]) (?:2[0123]|[01]?[0-9]):(?:[0-5][0-9]):(?:[0-5][0-9])) (?<prog>.*?): (?<msg>.*)" ]
}
mutate {
gsub => ["datetime"," "," "]
}
date {
match => [ "datetime", "MMM dd HH:mm:ss" ]
timezone => "America/Los_Angeles"
}
mutate {
replace => [ "message", "%{msg}" ]
}
mutate {
remove_field => [ "msg", "datetime" ]
}
}
if [prog] =~ /^filterlog$/ {
mutate {
remove_field => [ "msg", "datetime" ]
}
grok {
patterns_dir => "/usr/share/logstash/pipeline/patterns"
match => [ "message", "%{PFSENSE_LOG_DATA}%{PFSENSE_IP_SPECIFIC_DATA}%{PFSENSE_IP_DATA}%{PFSENSE_PROTOCOL_DATA}",
"message", "%{PFSENSE_LOG_DATA}%{PFSENSE_IPv4_SPECIFIC_DATA_ECN}%{PFSENSE_IP_DATA}%{PFSENSE_PROTOCOL_DATA}",
"message", "%{PFSENSE_LOG_DATA}%{PFSENSE_IPv6_SPECIFIC_DATA}"]
}
mutate {
lowercase => [ 'proto' ]
}
geoip {
add_tag => [ "GeoIP" ]
source => "src_ip"
add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ]
# Optional GeoIP database
# Comment out the below if you do not wise to utilize and omit last three steps dealing with (recommended) suffix
database => "/usr/share/logstash/GeoLite2-City.mmdb"
}
mutate {
convert => [ "[geoip][coordinates]", "float"]
}
}
}
output {
s3 {
access_key_id => 'XXXXXXXXXXXXXXXXXXXX'
secret_access_key => 'XXXXXXXXXXXXXXXXXXXXXX'
region => 'us-east-1'
bucket => 'XXXXXXXXXXXXXXX'
time_file => 5
canned_acl => 'private'
#codec => 'plain'
#codec => json_lines
#temporary_directory => '/tmp/log-data'
}
}
Example s3 output:
2018-03-07T08:16:28.595Z XXXXXXXXXX <134>Mar 7 08:16:28 filterlog: 5,,,1000000103,em0,match,block,in,4,0x0,,2,5005,0,DF,17,udp,428,$srcip,$dstip,44441,1900,408