Using two input plugins ( beats and udp ) and output it to Elasticsearch

I am using the following logstash configuration but output is only on single index pattern and other one is not working :

I am using the following logstash configuration but output is only on single index pattern and other one is not working :

input{
udp{
port => 514
type => "works"
}
beats{
port => 5044
type => "metricbeat"
}
}

filter{
if [type] == "works"{
grok {
match => {"message" => "<%{INT:loglvl}>worksController: %{LOGLEVEL:loglevel} %{USERNAME:user} %{WORD:ORGNAME} (?<ORG_ID>(?:[A-Fa-f0-9]{8}-(?:[A-Fa-f0-9]{4}-){3}[A-Fa-f0-9]{12})) %{WORD:logger}(?:\s+[%{GREEDYDATA:temp}]([.:])?)? %{GREEDYDATA:msg}" }
}

	if "_grokparsefailure" not in [tags] {
		if [temp] {
			mutate {
					replace => {"message" => "<%{loglvl}>worksController: [%{temp}]: %{msg}"}
					remove_field => [ "msg" ]  
					gsub => ["temp"," ","_"]
				}
		}
		else {
			mutate {
				replace => {"message" => "<%{loglvl}>worksController: %{msg}"}
				remove_field => [ "msg" ]
			}
		}

		if [temp] {
			mutate {
				add_tag => ["%{temp}"]
			}
		}

		if [temp] {
			mutate {
			  remove_field => [ "temp" ]
			}
		  }

		if " logged in" in [message] {
			mutate {
				add_tag => ["logged_in","%{user}_logged_in"]
			}
		}

		if " successfully logged out" in [message] {
			mutate {
				add_tag => ["logged_out","%{user}_logged_out"]
			}
		}

		if " 'worksadmin' successfully logged into " in [message] {
			mutate {
				add_tag => ["admin_logged_in","works"]
			}
		}

		if " and has been Disconnected." in [message] {
			mutate {
			add_tag => ["idle_disconnected","%{user}_idle_disconnected"]
			}
		}

		if " and has been Logout." in [message] {
			mutate {
				add_tag => ["idle_logout","%{user}_idle_logout"]
			}
		}

		mutate {
			add_tag => ["_user_%{user}"]
		}
		mutate {
			add_tag => ["%{type}_user"]
		}
	}  
	else {
		mutate {
			add_field => {"loglevel" => "ERROR"}
			add_tag => ["error_log","error_traceback"]
		}
	}
}
if [type] == "metricbeat" {
	mutate {
		add_tag => ["metricbeat"]
	}
}

}

output {
if "works" in [tags] {
elasticsearch {
hosts => ["localhost:9200"]
codec => "rubydebug"
sniffing => true
manage_template => false
index => "works-%{+YYYY.MM.dd}"
}
stdout {
codec => rubydebug
}
}
if "metricbeat" in [tags]{
elasticsearch {
hosts => ["localhost:9200"]
codec => "rubydebug"
sniffing => true
manage_template => false
index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
document_type => "%{[@metadata][type]}"
}
}
}

You have if "works" in [tags] { # do stuff }
In your output block, but I can't see it added anywhere in the configuration (while the metricbeat tag is added). Then again, it's hard to follow without proper formatting.

Also, using a mutate filter to just add a tag, to only use that tag for output filtering is overkill. I.e. this part

if [type] == "metricbeat" {
	mutate {
		add_tag => ["metricbeat"]
	}
}

You can filter your output based on any field, so just checking for type instead of adding extra tags should work quite fine.

Thanks Paris. I really appreciate that.

**** _ In your output block, but I can't see it added anywhere in the configuration (while the metricbeat tag is added)._ **

With reference to above ==> I had added "works" in the Configuration. I haven't pasted above. Sorry about that.
I tried the output filter based on type but only getting " hyworks-* " output on Kibana and Elasticsearch.
I am not getting the "metricbeat-" output on Kibana and Elasticsearch. I mean to say that output plugin of "metricbeat- " is not working but the other one is working.

Below is the proper format that I am using in input plugin :-

input {
	udp {
		port => 514
		type => "hyworks"
	}
	beats {
		port => 5044
		type => "metricbeat"
	}
}

Below is the proper format that I am using in filter plugin :-

filter {
	if [type] == "hyworks"   {
		grok {
			match => {"message" => "<%{INT:loglvl}>HyworksController: %{LOGLEVEL:loglevel} %{USERNAME:user} %{WORD:ORGNAME} (?<ORG_ID>(?:[A-Fa-f0-9]{8}-(?:[A-Fa-f0-9]{4}-){3}[A-Fa-f0-9]{12})) %{WORD:logger}(?:\s+\[%{GREEDYDATA:temp}\]([\.:])?)? %{GREEDYDATA:msg}" }
		}
      
		if "_grokparsefailure" not in [tags] {
  
			if [temp] {
				mutate {
					replace => {"message" => "<%{loglvl}>HyworksController: [%{temp}]: %{msg}"}
					remove_field => [ "msg" ]
					gsub => ["temp"," ","_"]
				}
			}
			else {
				mutate {
					replace => {"message" => "<%{loglvl}>HyworksController: %{msg}"}
					remove_field => [ "msg" ]
				}
			}

			if [temp] {
				mutate {
					add_tag => ["%{temp}"]
				}
			}

			if [temp] {
				mutate {
					remove_field => [ "temp" ]
				}
			}

									############**************************************** FILTER ****************************************#########

			if " logged in" in [message] {
				mutate {
					add_tag => ["logged_in","%{user}_logged_in"]
				}
			}

			if " successfully logged out" in [message] {
				mutate {
					add_tag => ["logged_out","%{user}_logged_out"]
				}
			}

			if " 'hyworksadmin' successfully logged into " in [message] {
				mutate {
					add_tag => ["admin_logged_in","hyworks"]
				}
			}
    
			if " and has been Disconnected." in [message] {
				mutate {
					add_tag => ["idle_disconnected","%{user}_idle_disconnected"]
				}
			}

			if " and has been Logout." in [message] {
				mutate {
					add_tag => ["idle_logout","%{user}_idle_logout"]
				}
			}

			mutate {
				add_tag => ["_user_%{user}"]
			}
		}  
								############**************************************** FILTER ****************************************#########
		else {
			mutate {
				add_field => {"loglevel" => "ERROR"}
				add_tag => ["error_log","error_traceback"]
			}
		}
	}
	if [type] == "metricbeat" {
		# No filtering is required : Do Nothing
	}
}

Below is the proper format that I am using in output plugin :-

output {
	if [type] == "hyworks" { 
		elasticsearch {
			hosts => ["localhost:9200"]
			codec => "rubydebug"
			sniffing => true
			manage_template => false
			index => "hyworks-%{+YYYY.MM.dd}"
		}
		stdout {
			codec => rubydebug 
		}
    }
	if [type] == "metricbeat" {
		elasticsearch {
			hosts => ["localhost:9200"]
			codec => "rubydebug"
			sniffing => true
			manage_template => false
			index => "metricbeat-%{+YYYY.MM.dd}" 
			document_type => "%{[@metadata][type]}" 
		}
	}
}

Hmm, alright, let's break it down. First of all, whatever happens in the filter section shouldn't have any impact on whether the events end up in Elasticsearch (since you're not dropping events there).

You can try and comment out the entire filter section to simplify the debug process (see if any events are ending up in Elasticsearch in their raw format), and after those issues are resolved you can start adding the filter section back in.

  1. You setting a type in the beats input has no effect, since as per the documentation

The Beats shipper automatically sets the type field on the event. You cannot override this setting in the Logstash config. If you specify a setting for the type config option in Logstash, it is ignored.

  1. Using this in your Elasticsearch output has no effect, just delete that line from both outputs (Not sure if it even creates issues, never tried it myself).
codec => "rubydebug"
  1. Have you tried the stripped down configuration proposed in the documentation linked above? If not, maybe try it to see if you can receive events as-is from the beats input.

Thanks Paris. I am glad you replied asap. I really appreciate your help. Thanks a lot !! You made my Day !!

Thanks you very much !!

The Beats shipper automatically sets the type field on the event. You cannot override this setting in the Logstash config. If you specify a setting for the type config option in Logstash, it is ignored.

It helped me a lot to resolve the above issue. I haven't changed the filter configurations.

I have used the following configurations and it resolved the above issue.

input {
	udp {
		port => 514
		type => "hyworks"
	}
	beats {
		port => 5044
	}
}

filter {
	if [type] == "hyworks"   {
		grok {
			match => {"message" => "<%{INT:loglvl}>HyworksController: %{LOGLEVEL:loglevel} %{USERNAME:user} %{WORD:ORGNAME} (?<ORG_ID>(?:[A-Fa-f0-9]{8}-(?:[A-Fa-f0-9]{4}-){3}[A-Fa-f0-9]{12})) %{WORD:logger}(?:\s+\[%{GREEDYDATA:temp}\]([\.:])?)? %{GREEDYDATA:msg}" }
		}
      
		if "_grokparsefailure" not in [tags] {
  
			if [temp] {
				mutate {
					replace => {"message" => "<%{loglvl}>HyworksController: [%{temp}]: %{msg}"}
					remove_field => [ "msg" ]
					gsub => ["temp"," ","_"]
				}
			}
			else {
				mutate {
					replace => {"message" => "<%{loglvl}>HyworksController: %{msg}"}
					remove_field => [ "msg" ]
				}
			}

			if [temp] {
				mutate {
					add_tag => ["%{temp}"]
				}
			}

			if [temp] {
				mutate {
					remove_field => [ "temp" ]
				}
			}

									

			if " logged in" in [message] {
				mutate {
					add_tag => ["logged_in","%{user}_logged_in"]
				}
			}

			if " successfully logged out" in [message] {
				mutate {
					add_tag => ["logged_out","%{user}_logged_out"]
				}
			}

			if " 'hyworksadmin' successfully logged into " in [message] {
				mutate {
					add_tag => ["admin_logged_in","hyworks"]
				}
			}
    
			if " and has been Disconnected." in [message] {
				mutate {
					add_tag => ["idle_disconnected","%{user}_idle_disconnected"]
				}
			}

			if " and has been Logout." in [message] {
				mutate {
					add_tag => ["idle_logout","%{user}_idle_logout"]
				}
			}

			mutate {
				add_tag => ["_user_%{user}"]
			}
		}  
		else {
			mutate {
				add_field => {"loglevel" => "ERROR"}
				add_tag => ["error_log","error_traceback"]
			}
		}
	}
}


output {
	
	if [type] == "hyworks" { 
		elasticsearch {
			hosts => ["localhost:9200"]
			sniffing => true
			manage_template => false
			index => "hyworks-%{+YYYY.MM.dd}"
		}
    }
	else{ 
		elasticsearch {
			hosts => "localhost:9200"
			manage_template => false
			index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
			document_type => "%{[@metadata][type]}" 
		}
	}
	
}

Now, the above issue is resolved for UDP and beats, but if I have beats of Filebeat and Metricbeat then how to filter the output based on the type.
Any suggestions ?

I got the logs of Filebeat and Metricbeat using the same configuration. Thanks for everything. !!

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.