Trying to configure logstash to accept syslog messages and then place those into Elastic Search / Kibana but having several weird issues.
The first is that actual syslog messages are being rejected with this type of message:
[2019-02-05T18:40:23,178][WARN ][logstash.outputs.elasticsearch] Could not index event to Elasticsearch. {:status=>400, :action=>["index", {:_id=>nil, :_index=>"%{[@metadata][beat]}-%{[@metadata][version]}-2019.12.23", :_type=>"%{[@metadata][type]}", :routing=>nil}, #LogStash::Event:0x6b756b5c], :response=>{"index"=>{"_index"=>"%{[@metadata][beat]}-%{[@metadata][version]}-2019.12.23", "_type"=>"%{[@metadata][type]}", "_id"=>"mFetwGgBOplP3kL8ggVx", "status"=>400, "error"=>{"type"=>"illegal_argument_exception", "reason"=>"Rejecting mapping update to [%{[@metadata][beat]}-%{[@metadata][version]}-2019.12.23] as the final mapping would have more than 1 type: [%{[@metadata][type]}, doc]"}}}}
This doesn't make any sense to me because the configuration for syslog doesn't have any variables being used other than date and time.
The second issue (maybe related) is that the data that is getting parsed into our indexes named syslog-test-%{+YYYY.MM.dd} seem to contain our filebeat data! We literally appear to be getting duplicated data in two indexes now.
We only have one instance of Logstash running and are trying to use multi-pipeline with it for the different data input methods. I'm thinking that maybe we don't have this configured right though because all the data seems to be getting mingled together.
We have THREE configuration files in /etc/logstash2/conf.d
- DNS-Unbound.conf
- Syslog.conf
- WinLogBeats.conf
19:02:00 elk-02:/etc/logstash2/conf.d$ cat DNS-Unbound.conf
# Logstash Plugin to listen for LogBeats input and send to local Elastic instance #
# Updated: 180802 #
input {
beats {
port => 5045
host => "10.x.x.x"
}
}
filter {
grok {
match => { "message" => "%{GREEDYDATA:datestamp} %{USER}\[%{NUMBER:process_id}:%{NUMBER:instance_id}] %{LOGLEVEL}: %{IP:client_ip} %{GREEDYDATA:query}\. %{WORD:record_type} IN %{WORD:message_flags} %{NUMBER:duration} %{NUMBER:cached} %{NUMBER:cache_id}" }
}
}
output {
elasticsearch {
id => "Unbound"
hosts => "10.x.x.x:9200"
manage_template => true
index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
}
}
19:02:20 elk-02:/etc/logstash2/conf.d$ cat Syslog.conf
input {
tcp {
port => 9514
type => syslog
}
udp {
port => 9514
type => syslog
}
}
filter {
mutate {
remove_field => [ "host" ]
}
if [type] == "syslog" {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
add_field => [ "received_at", "%{@timestamp}" ]
add_field => [ "received_from", "%{host}" ]
}
date {
match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
}
}
output {
elasticsearch {
hosts => ["10.x.x.x:9200"]
index => "syslog-test-%{+YYYY.MM.dd}"
}
}
19:03:09 elk-02:/etc/logstash2/conf.d$ cat WinLogBeats.conf
input {
beats {
port => 5044
host => "10.x.x.x"
}
}
filter {
geoip {
source => "[event_data][IpAddress]"
}
translate {
field => "[event_id]"
destination => "[event_id_description]"
dictionary => {
"1100" => "The event logging service has shut down"
"1101" => "Audit events have been dropped by the transport."
<snip>
"8191" => "Highest System-Defined Audit Message Value"
}
fallback => "Event description not available"
}
}
output {
elasticsearch {
hosts => "10.x.x.x:9200"
manage_template => false
index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
document_type => "%{[@metadata][type]}"
}
}