Hello Community:
Im a new user at ELK. I have deployed an ELK stack and everything works fine except for my Logstash pipelines. I explain myself:
I have a Logstash server with 3 pipelines defined:
Pipeline 1: Manage and process logs sended by rsyslog #1 which sends Site 1 network devices´ logs. Creates 3 indexes: auth,network and login to records related to authorization, network and login respectively.
Pipeline 2: Very alike to pipeline 1. Manage logs sended by rsyslog #2 which sends Site 2 network devices´ logs. Like Pipeline 1 creates 3 indexes with the same characteristics of Pipeline 1.
*Note: the configuration files for this two pipelines are very similar, because in both Sites we have deployed network equipment of the same brand and model.
Pipeline 3: Manage and process logs from a system sended through Filebeat.
I have noted since day 1 of the deployment of the ELK stack that pipelines mixed records in the indexes, for example: Records from indexes created by Pipeline 1 are stored in indexes of Pipeline 2 or Pipeline 3, the same occur in the other pipelines. Records from Pipeline 2 stored in indexes of pipeline 1 or 3.
This is very annoying because as a result I have "dirty" data in every index of my Elasticsearch cluster, I cannot search info from any index without obtain data that supposedly do not belong there. The dashboards become unusable and everything is a mess for me.
I have tried to stop, for example, Pipeline 1 and 2, and everything works fine and records are stored correctly in index created by Pipeline 3. I'm not sure, but maybe is a problem in configuration files of pipelines.
I develop my idea using GitHub - nurhambali/Huawei-ELK: ELK Device Huawei as base.
Thanks to all in advance! Sorry the long post.
Here are the config files:
Pipeline1:
input {
tcp {
host => "logstash_host"
port => 5044
type => Huawei
}
udp {
host => "logstash_host"
port => 5044
type => Huawei
}
}
filter {
if[type]=="Huawei"{
if "01SHELL/5/LOGIN" in [message]{
grok{
match => { "message" => "%{LOGIN}" }
}
}
else if "01SHELL/5/LOGOUT" in [message]{
grok{
match => { "message" => "%{LOGOUT}" }
}
}
(A lot of else if more checking every posible message)
else{
grok{
match => { "message" => "%{DISPLAY_CMDRECORD}" } #OK
################################################
}
}
mutate { add_field => {"Year"=>"%{+YYYY}"}}
### record
if [Brief] == "DISPLAY_CMDRECORD" or [Brief] == "CMDRECORD" (A lot of 'or' clause more)
{
mutate { add_field => {"Alias" => "record"}}
}
### auth
if [Brief] == "LOGINFAILED" or [Brief] == "LOGIN" (A lot of 'or' clause more)
{
mutate { add_field => {"Alias" => "auth"}}
}
### Translet UserType
translate {
field => "[Serverity]"
destination => "[LogLevel]"
override => "true"
dictionary_path => "/etc/logstash/translate/translate.yml"
}
### Translate from host to devices
translate {
field => "[Host]"
destination => "[Device]"
override => "true"
dictionary_path => "/etc/logstash/translate/device.yml"
}
### Drop user _system_
if "_system_" in [User] {drop {}}
if "**" in [User] {drop {}}
if "**" in [UserName] {drop{}}
if "S3928_huawei" in [message]{
mutate {replace => { "Device" =>"S3928_huawei"}}
mutate {replace => {"ModuleName" => "DEV"}}
}
mutate {remove_field => ["@version"]}
mutate {remove_field => ["Serverity"]}
mutate {rename => {"type" => "Type"}}
#mutate {rename => {"message" => "Message"}}
mutate {rename => {"host" => "Host"}}
#mutate { add_field => "LogLevel"}
#mutate { gsub => [ "msg", "\\n\\t", " " ] }
#########################################
}
output {
stdout { codec => rubydebug }
if [Alias] == "auth" {
elasticsearch {
hosts => ["elastic1:9200","elastic2:9200","elastic3:9200"]
index => "auth-%{+YYYY-MM-dd}"
}
}
else if [Alias] == "record" {
elasticsearch {
hosts => ["elastic1:9200","elastic2:9200","elastic3:9200"]
index => "record-%{+YYYY-MM-dd}"
}
}
else if "_grokparsefailure" in [tags]{
elasticsearch {
hosts => ["elastic1:9200","elastic2:9200","elastic3:9200"]
index => "grokparsefailure"
}
}
else {
elasticsearch {
hosts => ["elastic1:9200","elastic2:9200","elastic3:9200"]
index => "network-%{+YYYY-MM-dd}"
}
}
}```
Pipeline 2:
input {
tcp {
host => "logstash"
port => 5050
type => nap
}
udp {
host => "logstash"
port => 5050
type => nap
}
}
filter {
if[type]=="nap"{
### LOGIN related
if "01SEC/5/ATCKDF" in [message]{
grok{
patterns_dir=>"/etc/logstash/patterns/nap"
match => { "message" => "%{ATCKDF}" }
}
}
(A lot of else if more)
#match => { "message" => "%{}" }
################################################
}
mutate { add_field => {"Year"=>"%{+YYYY}"}}
### record
if [Brief] == "DISPLAY_CMDRECORD" or [Brief] == "CMDRECORD" or [Brief] == "ALARM"
{
mutate { add_field => {"Alias" => "record"}}
}
### Translet UserType
translate {
field => "[Serverity]"
destination => "[LogLevel]"
override => "true"
dictionary_path => "/etc/logstash/translate/translate.yml"
}
### Translate from host to devices
translate {
field => "[Host]"
destination => "[Device]"
override => "true"
dictionary_path => "/etc/logstash/translate/device.yml"
}
### Drop user _system_
if "_system_" in [User] {drop {}}
if "**" in [User] {drop {}}
if "**" in [UserName] {drop{}}
mutate {remove_field => ["@version"]}
mutate {remove_field => ["Serverity"]}
mutate {rename => {"type" => "Type"}}
#mutate {rename => {"message" => "Message"}}
mutate {rename => {"host" => "Host"}}
#mutate { add_field => "LogLevel"}
#mutate { gsub => [ "msg", "\\n\\t", " " ] }
#########################################
}
output {
stdout { codec => rubydebug }
if [Alias] == "auth" {
elasticsearch {
hosts => ["elastic1:9200","elastic2:9200","elastic3:9200"]
index => "nap_auth-%{+YYYY-MM-dd}"
}
}
else if [Alias] == "record" {
elasticsearch {
hosts => ["elastic1:9200","elastic2:9200","elastic3:9200"]
index => "nap_record-%{+YYYY-MM-dd}"
}
}
else if "_grokparsefailure" in [tags]{
elasticsearch {
hosts => ["elastic1:9200","elastic2:9200","elastic3:9200"]
index => "grokparsefailure_nap"
}
}
else {
elasticsearch {
hosts => ["elastic1:9200","elastic2:9200","elastic3:9200"]
index => "nap_network-%{+YYYY-MM-dd}"
}
}
}
Pipeline3:
input {
beats {
host => "logstash_host"
port => 5045
ssl => false
type=> log
}
}
filter{
if "<LogData>" in [message]{
grok{
match => { "message" => "%{LOGLEVEL:loglevel}%{SPACE}%{DATA}%{SPACE}%{DAY:day_of_week} %{MONTH:month} %{NUMBER:day_number} %{TIME:hour} %{YEAR:year}] %{NOTSPACE} %{NOTSPACE} %{NOTSPACE:ip} %{NOTSPACE:pid} %{NOTSPACE:process} %{NOTSPACE} %{NOTSPACE} %{NOTSPACE} %{NOTSPACE} %{GREEDYDATA:message}" }
}
}
}
output {
elasticsearch {
hosts => ["elastic1:9200","elastic2:9200","elastic3:9200"]
index => "system_server-%{+YYYY-MM-dd}"
}
stdout { codec => rubydebug }
}```