Hi!
I am handling pfsense data with logstash but having problems with indexing.
This is my configuration. I have 2 pfsense.
01-inputpf1.conf
input {
tcp {
type => "syslog1"
port => 5140
}
}
input {
udp {
type => "syslog1"
port => 5140
}
}
01-inputpf2.conf
input {
tcp {
type => "syslog2"
port => 1234
}
}
input {
udp {
type => "syslog2"
port => 1234
}
}
10-fifltersyslog1.conf
filter {
if [type] == "syslog1" {
#change to pfSense ip address
if [host] =~ /10\.10\.1\.11/ {
mutate {
add_tag => ["PFSense", "Ready"]
}
}
if "Ready" not in [tags] {
mutate {
add_tag => [ "syslog1" ]
}
}
}
}
filter {
if [type] == "syslog1" {
mutate {
remove_tag => "Ready"
}
}
}
filter {
if "syslog1" in [tags] {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
add_field => [ "received_at", "%{@timestamp}" ]
add_field => [ "received_from", "%{host}" ]
}
syslog_pri { }
date {
match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
locale => "en"
}
if !("_grokparsefailure" in [tags]) {
mutate {
replace => [ "@source_host", "%{syslog_hostname}" ]
replace => [ "@message", "%{syslog_message}" ]
}
}
mutate {
remove_field => [ "syslog_hostname", "syslog_message", "syslog_timestamp" ]
}
}
}
11-filtersyslog2.conf
filter {
if [type] == "syslog2" {
#change to pfSense ip address
if [host] =~ /10\.10\.2\.2/ {
mutate {
add_tag => ["PFSense", "Ready"]
}
}
if "Ready" not in [tags] {
mutate {
add_tag => [ "syslog2" ]
}
}
}
}
filter {
if [type] == "syslog2" {
mutate {
remove_tag => "Ready"
}
}
}
filter {
if "syslog2" in [tags] {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
add_field => [ "received_at", "%{@timestamp}" ]
add_field => [ "received_from", "%{host}" ]
}
syslog_pri { }
date {
match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
locale => "en"
}
if !("_grokparsefailure" in [tags]) {
mutate {
replace => [ "@source_host", "%{syslog_hostname}" ]
replace => [ "@message", "%{syslog_message}" ]
}
}
mutate {
remove_field => [ "syslog_hostname", "syslog_message", "syslog_timestamp" ]
}
}
}
20-filterpfsense.conf
filter {
if "PFSense1" in [tags] {
grok {
add_tag => [ "firewall" ]
match => [ "message", "<(?<evtid>.*)>(?<datetime>(?:Jan(?:uary)?|Feb(?:ruary)?|Mar(?:ch)?|Apr(?:il)?|May|Jun(?:e)?|Jul(?:y)?|Aug(?:ust)?|Sep(?:tember)?|Oct(?:ober)?|Nov(?:ember)?|Dec(?:ember)?)\s+(?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9]) (?:2[0123]|[01]?[0-9]):(?:[0-5][0-9]):(?:[0-5][0-9])) (?<prog>.*?): (?<msg>.*)" ]
}
mutate {
gsub => ["datetime"," "," "]
}
date {
match => [ "datetime", "MMM dd HH:mm:ss" ]
timezone => "Asia/Ho_Chi_Minh"
}
mutate {
replace => [ "message", "%{msg}" ]
}
mutate {
remove_field => [ "msg", "datetime" ]
}
}
if [prog] =~ /^filterlog$/ {
mutate {
remove_field => [ "msg", "datetime" ]
}
grok {
patterns_dir => "/etc/logstash/conf.d/patterns"
match => [ "message", "%{PFSENSE_LOG_DATA}%{PFSENSE_IP_SPECIFIC_DATA}%{PFSENSE_IP_DATA}%{PFSENSE_PROTOCOL_DATA}",
"message", "%{PFSENSE_LOG_DATA}%{PFSENSE_IPv4_SPECIFIC_DATA_ECN}%{PFSENSE_IP_DATA}%{PFSENSE_PROTOCOL_DATA}",
"message", "%{PFSENSE_LOG_DATA}%{PFSENSE_IPv6_SPECIFIC_DATA}"]
}
mutate {
lowercase => [ 'proto' ]
}
geoip {
add_tag => [ "GeoIP" ]
source => "src_ip"
database => "/etc/logstash/GeoLite2-City.mmdb"
}
}
}
50-output.conf
output { stdout { codec => rubydebug }
if [type] == "syslog1" {
elasticsearch {
hosts => ["http://10.10.1.162:9200"]
index => "pfsense1-%{+YYYY.MM.dd}"
}
elasticsearch {
hosts => ["http://10.10.1.162:9200"]
index => "logstash-%{+YYY.MM.dd}"
}
}
else if [type] == "syslog2" {
elasticsearch {
hosts => ["http://10.10.1.162:9200"]
index => "pfsense2-%{+YYYY.MM.dd}"
}
}
else if [type] != "syslog2" {
elasticsearch {
hosts => ["http://10.10.1.162:9200"]
index => "logstash-%{+YYY.MM.dd}"
}
}
}
when I use the above configuration, logstash has created 3 indexes (pfsense1, pfsense2 and logstash), but I cannot use the data to create visualizations.
But when I used the following output, it worked and I could create visualizations. How to split 2 data of 2 pfsense into 2 different indexes? please help me. thank you.
output { stdout { codec => rubydebug }
elasticsearch {
hosts => ["http://10.10.1.162:9200"]
index => "logstash-%{+YYY.MM.dd}"
}
}