Logstash 5.1.2 with this config file.
input {
kafka {
topics => "asa-firewall-2"
type => "firewall"
consumer_threads => 32
max_poll_records => "8000"
bootstrap_servers => "10.64.2.225:9092"
}
}
filter {
grok {
match => {
"message" => [
"^%{TIMESTAMP_ISO8601:timestamp} %{DATA:msgtype}: %{DATA:description} inbound %{DATA:protocol} connection %{NUMBER:sessionid} for %{DATA:srcinterface}:%{IPORHOST:sourceIP}/%{NUMBER:sourcePORT} (%{DATA:JUNK}) to %{DATA:dstinterface}:%{IPORHOST:destIP}/%{NUMBER:destPORT} (%{DATA:JUNK})$",
"^%{TIMESTAMP_ISO8601:timestamp} %{DATA:msgtype}: %{DATA:description} outbound %{DATA:protocol} connection %{NUMBER:sessionid} for %{DATA:dstinterface}:%{IPORHOST:destIP}/%{NUMBER:destPORT} (%{DATA:JUNK}) to %{DATA:srcinterface}:%{IPORHOST:sourceIP}/%{NUMBER:sourcePORT} (%{DATA:JUNK})$",
"^%{TIMESTAMP_ISO8601:timestamp} %{DATA:msgtype}: %{DATA:description} %{DATA:protocol} connection %{NUMBER:sessionid} for %{DATA:srcinterface}:%{IPORHOST:sourceIP}/%{NUMBER:sourcePORT} to %{DATA:dstinterface}:%{IPORHOST:destIP}/%{NUMBER:destPORT} duration %{TIME:duration} bytes %{DATA:bytes}$",
"^%{TIMESTAMP_ISO8601:timestamp} %{DATA:msgtype}: %{DATA:description} %{DATA:protocol} connection %{NUMBER:sessionid} for %{DATA:srcinterface}:%{IPORHOST:sourceIP}/%{NUMBER:sourcePORT} to %{DATA:dstinterface}:%{IPORHOST:destIP}/%{NUMBER:destPORT} duration %{DATA:duration} bytes %{DATA:bytes} %{GREEDYDATA:event}$",
"^%{TIMESTAMP_ISO8601:timestamp} %{DATA:msgtype}: %{DATA:description} dynamic %{DATA:protocol} translation from %{DATA:srcinterface}:%{IPORHOST:sourceIP}/%{NUMBER:sourcePORT} to %{DATA:dstinterface}:%{IPORHOST:destIP}/%{NUMBER:destPORT} duration %{TIME:duration}$",
"^%{TIMESTAMP_ISO8601:timestamp} %{DATA:msgtype}: %{DATA:description} dynamic %{DATA:protocol} translation from %{DATA:srcinterface}:%{IPORHOST:sourceIP}/%{NUMBER:sourcePORT} to %{DATA:dstinterface}:%{IPORHOST:destIP}/%{NUMBER:destPORT}$",
"^%{TIMESTAMP_ISO8601:timestamp} %{DATA:msgtype}: %{DATA:description} %{DATA:protocol} for faddr %{IPORHOST:sourceIP}/%{NUMBER:sourcePORT} gaddr %{IPORHOST:JUNK}/%{NUMBER:JUNK} laddr %{IPORHOST:destIP}/%{NUMBER:destPORT}$",
"^%{TIMESTAMP_ISO8601:timestamp} %{DATA:msgtype}: %{DATA:description}; Connection for %{DATA:protocol} src %{DATA:srcinterface}:%{IPORHOST:sourceIP}/%{NUMBER:sourcePORT} dst %{DATA:dstinterface}:%{IPORHOST:destIP}/%{NUMBER:destPORT} %{DATA:description}$",
"^%{TIMESTAMP_ISO8601:timestamp} %{DATA:msgtype}: %{DATA:description} for %{DATA:protocol} src %{DATA:srcinterface}:%{IPORHOST:sourceIP}/%{NUMBER:sourcePORT} dst %{DATA:dstinterface}:%{IPORHOST:destIP}/%{NUMBER:destPORT}$",
"^%{TIMESTAMP_ISO8601:timestamp} %{DATA:msgtype}: %{DATA:description}; Connection for %{DATA:protocol} src %{DATA:srcinterface}:%{IPORHOST:sourceIP} dst %{DATA:dstinterface}:%{IPORHOST:destIP} %{DATA:description}$",
"^%{TIMESTAMP_ISO8601:timestamp} %{DATA:msgtype}: %{DATA:description} from %{DATA:srcinterface}:%{IPORHOST:sourceIP} (%{DATA:JUNK}) to %{DATA:dstinterface}:%{IPORHOST:destIP}/%{NUMBER:destPORT} (%{DATA:JUNK})$",
"^%{TIMESTAMP_ISO8601:timestamp} %{DATA:msgtype}: %{DATA:description} from %{DATA:srcinterface}:%{IPORHOST:sourceIP} to %{DATA:dstinterface}:%{IPORHOST:destIP}/%{NUMBER:destPORT} duration %{TIME:duration} bytes %{DATA:bytes}$",
"^%{TIMESTAMP_ISO8601:timestamp} %{DATA:msgtype}: %{DATA:description} for protocol %{DATA:protocol} src %{DATA:srcinterface}:%{IPORHOST:sourceIP} dst %{DATA:dstinterface}:%{IPORHOST:destIP}$",
"^%{TIMESTAMP_ISO8601:timestamp} %{DATA:msgtype}: %{DATA:description} from %{DATA:srcinterface}:%{IPORHOST:sourceIP}/%{NUMBER:sourcePORT} to %{DATA:dstinterface}:%{IPORHOST:destIP}/%{NUMBER:destPORT}, %{DATA:description}$"
]
}
}
date {
match => [ "timestamp", "ISO8601" ]
target => "@timestamp"
}
if !("_grokparsefailure" in [tags]) {
mutate {
remove_field => [ "message", "JUNK" ]
}
}
}
output {
elasticsearch {
manage_template => "false"
hosts => ["10.64.2.207", "10.64.2.208", "10.64.2.209"]
index => [ "asa-%{+YYYY.MM}" ]
flush_size => 5000
}
}