Multiple pipelines mixing records in indexes

Hello Community:

Im a new user at ELK. I have deployed an ELK stack and everything works fine except for my Logstash pipelines. I explain myself:

I have a Logstash server with 3 pipelines defined:

Pipeline 1: Manage and process logs sended by rsyslog #1 which sends Site 1 network devices´ logs. Creates 3 indexes: auth,network and login to records related to authorization, network and login respectively.

Pipeline 2: Very alike to pipeline 1. Manage logs sended by rsyslog #2 which sends Site 2 network devices´ logs. Like Pipeline 1 creates 3 indexes with the same characteristics of Pipeline 1.
*Note: the configuration files for this two pipelines are very similar, because in both Sites we have deployed network equipment of the same brand and model.

Pipeline 3: Manage and process logs from a system sended through Filebeat.

I have noted since day 1 of the deployment of the ELK stack that pipelines mixed records in the indexes, for example: Records from indexes created by Pipeline 1 are stored in indexes of Pipeline 2 or Pipeline 3, the same occur in the other pipelines. Records from Pipeline 2 stored in indexes of pipeline 1 or 3.

This is very annoying because as a result I have "dirty" data in every index of my Elasticsearch cluster, I cannot search info from any index without obtain data that supposedly do not belong there. The dashboards become unusable and everything is a mess for me.

I have tried to stop, for example, Pipeline 1 and 2, and everything works fine and records are stored correctly in index created by Pipeline 3. I'm not sure, but maybe is a problem in configuration files of pipelines.

I develop my idea using GitHub - nurhambali/Huawei-ELK: ELK Device Huawei as base.

Thanks to all in advance! Sorry the long post.

Here are the config files:

Pipeline1:

input {
  tcp {
    host => "logstash_host"
    port => 5044
    type => Huawei
  }
  udp {
    host => "logstash_host"
    port => 5044
    type => Huawei
  }
}
filter {
if[type]=="Huawei"{
	
	if "01SHELL/5/LOGIN" in [message]{
	grok{
	match => { "message" => "%{LOGIN}" }
	}
	}

	else if "01SHELL/5/LOGOUT" in [message]{
        grok{
        match => { "message" => "%{LOGOUT}" }
        }
        }
		(A lot of else if more checking every posible message)
       
	else{
	grok{
	match => { "message" => "%{DISPLAY_CMDRECORD}" } #OK
	
	################################################
	}
	}
	mutate { add_field => {"Year"=>"%{+YYYY}"}}

	### record
	if [Brief] == "DISPLAY_CMDRECORD" or [Brief] == "CMDRECORD" (A lot of 'or' clause more)
	{
	mutate { add_field => {"Alias" => "record"}}
	}
	
	### auth
	
	if [Brief] == "LOGINFAILED" or [Brief] == "LOGIN" (A lot of 'or' clause more)
	{
	mutate { add_field => {"Alias" => "auth"}}
	}

        
	
	### Translet UserType
	translate {
	field => "[Serverity]"
	destination => "[LogLevel]"
	override => "true"
	dictionary_path => "/etc/logstash/translate/translate.yml"
	}
	### Translate from host to devices
	translate {
	field => "[Host]"
	destination => "[Device]"
	override => "true"
	dictionary_path => "/etc/logstash/translate/device.yml"
	}
	### Drop user _system_
	if "_system_" in [User] {drop {}}
	if "**" in [User] {drop {}}
	if "**" in [UserName] {drop{}}
	

	if "S3928_huawei" in [message]{
	mutate {replace => { "Device" =>"S3928_huawei"}}
        mutate {replace => {"ModuleName" => "DEV"}}	
	}
	
	mutate {remove_field => ["@version"]}
	mutate {remove_field => ["Serverity"]}
	mutate {rename => {"type" => "Type"}}
	#mutate {rename => {"message" => "Message"}}
	mutate {rename => {"host" => "Host"}}
	#mutate { add_field => "LogLevel"}
	#mutate { gsub => [ "msg", "\\n\\t", " " ] }
	
	#########################################
	}
	output {
	stdout { codec => rubydebug }
	if [Alias] == "auth" {
	elasticsearch {
	hosts => ["elastic1:9200","elastic2:9200","elastic3:9200"]
	index => "auth-%{+YYYY-MM-dd}"
	}
	}
	else if [Alias] == "record" {
	elasticsearch {
        hosts => ["elastic1:9200","elastic2:9200","elastic3:9200"]
        index => "record-%{+YYYY-MM-dd}"
        }
	}
	else if "_grokparsefailure" in [tags]{
	elasticsearch {
	hosts => ["elastic1:9200","elastic2:9200","elastic3:9200"]
	index => "grokparsefailure"
	}
	}
	else {
	elasticsearch {
	hosts => ["elastic1:9200","elastic2:9200","elastic3:9200"]
	index => "network-%{+YYYY-MM-dd}"
	}
	}
	}```

Pipeline 2:
input {
  tcp {
    host => "logstash"
    port => 5050
    type => nap
  }
  udp {
    host => "logstash"
    port => 5050
    type => nap
  }
}
filter {
if[type]=="nap"{

        ### LOGIN related
 
	if "01SEC/5/ATCKDF" in [message]{
	grok{
	patterns_dir=>"/etc/logstash/patterns/nap"
	match => { "message" => "%{ATCKDF}" }
	}
	}
	
	(A lot of else if more)
	
	#match => { "message" => "%{}" }

	################################################
	}
	mutate { add_field => {"Year"=>"%{+YYYY}"}}

	### record
	if [Brief] == "DISPLAY_CMDRECORD" or [Brief] == "CMDRECORD" or [Brief] == "ALARM"
	{
	mutate { add_field => {"Alias" => "record"}}
	}
	
		
	### Translet UserType
	translate {
	field => "[Serverity]"
	destination => "[LogLevel]"
	override => "true"
	dictionary_path => "/etc/logstash/translate/translate.yml"
	}
	### Translate from host to devices
	translate {
	field => "[Host]"
	destination => "[Device]"
	override => "true"
	dictionary_path => "/etc/logstash/translate/device.yml"
	}
	### Drop user _system_
	if "_system_" in [User] {drop {}}
	if "**" in [User] {drop {}}
	if "**" in [UserName] {drop{}}
	
	mutate {remove_field => ["@version"]}
	mutate {remove_field => ["Serverity"]}
	mutate {rename => {"type" => "Type"}}
	#mutate {rename => {"message" => "Message"}}
	mutate {rename => {"host" => "Host"}}
	#mutate { add_field => "LogLevel"}
	#mutate { gsub => [ "msg", "\\n\\t", " " ] }
	
	#########################################
}
	output {
	stdout { codec => rubydebug }
	if [Alias] == "auth" {
	elasticsearch {
	hosts => ["elastic1:9200","elastic2:9200","elastic3:9200"]
	index => "nap_auth-%{+YYYY-MM-dd}"
	}
	}
	else if [Alias] == "record" {
	elasticsearch {
        hosts => ["elastic1:9200","elastic2:9200","elastic3:9200"]
        index => "nap_record-%{+YYYY-MM-dd}"
        }
	}
	else if "_grokparsefailure" in [tags]{
	elasticsearch {
	hosts => ["elastic1:9200","elastic2:9200","elastic3:9200"]
	index => "grokparsefailure_nap"
	}
	}
	else {
	elasticsearch {
	hosts => ["elastic1:9200","elastic2:9200","elastic3:9200"]
	index => "nap_network-%{+YYYY-MM-dd}"
	}
	}
	}

Pipeline3:
input {
  beats {
    host => "logstash_host"
    port => 5045
    ssl => false
    type=> log
  }
}
filter{

if "<LogData>" in [message]{
        grok{
        match => { "message" => "%{LOGLEVEL:loglevel}%{SPACE}%{DATA}%{SPACE}%{DAY:day_of_week} %{MONTH:month} %{NUMBER:day_number} %{TIME:hour} %{YEAR:year}] %{NOTSPACE} %{NOTSPACE} %{NOTSPACE:ip} %{NOTSPACE:pid} %{NOTSPACE:process} %{NOTSPACE} %{NOTSPACE} %{NOTSPACE} %{NOTSPACE} %{GREEDYDATA:message}" }
        }
        }

}
output {
  elasticsearch {
	hosts => ["elastic1:9200","elastic2:9200","elastic3:9200"]
	index => "system_server-%{+YYYY-MM-dd}"
	}
stdout { codec => rubydebug }
}```

.conf files are not independent unless you configure that using pipelines.yml.

Thanks a lot!! You are totally correct!

Best Regards!

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.