If "keyword" in message not working for logstash

Hello all...
I am receiving logs from 5 different sources on one single port. In fact it is a collection of files being sent through syslog from a server in realtime. The server stores logs from 4 VPN servers and one DNS server. Now the server admin started sending all 5 types of files on a single port although I asked something different. Anyways, I thought to make this also work now.
Below are the different types of samples-

<134>Sep 30 22:02:27 mumvpn.domain1.com PulseSecure: id=firewall time="2021-09-30 22:02:27" pri=6 fw=xx2.21.140.100 vpn=ive user=user1 realm="google_auth" roles="" proto=auth src=1xx.87.92.246 dst= dstname= type=vpn op= arg="" result= sent= rcvd= agent="" duration= msg="AUT24803: Host Checker policy 'Hostchecker_Policy' passed on host '1xx.87.92.246' address '7a-fc-ac-08-88-9e'  for user 'user1'."
------------------
<13>Sep 30 22:03:28 xx2.20.43.100 370 <134>1 2021-09-30T22:03:28+05:30 canopus.domain1.com1 PulseSecure: - - - id=firewall time="2021-09-30 22:03:28" pri=6 fw=xx2.20.43.100 vpn=ive user=System realm="google_auth" roles="" proto= src=1xx.99.110.19 dst= dstname= type=vpn op= arg="" result= sent= rcvd= agent="" duration= msg="AUT23278: User Limit realm restrictions successfully passed for /google_auth "
------------------
<134>Sep 30 22:41:43 xx2.20.43.101 1 2021-09-30T22:41:43+05:30 canopus.domain1.com2 PulseSecure: - - - id=firewall time="2021-09-30 22:41:43" pri=6 fw=xx2.20.43.101 vpn=ive user=user22 realm="google_auth" roles="Domain_check_role" proto= src=1xx.200.27.62 dst= dstname= type=vpn op= arg="" result= sent= rcvd= agent="" duration= msg="NWC24328: Transport mode switched over to SSL for user with NCIP xx2.20.210.252 "
------------------
<134>Sep 30 22:36:59 vpn-dns-1 named[130237]: 30-Sep-2021 22:36:59.172 queries: info: client @0x7f8e0f5cab50 xx2.30.16.147#63335 (ind.event.freefiremobile.com): query: ind.event.freefiremobile.com IN A + (xx2.31.0.171)
------------------
<13>Sep 30 22:40:31 xx2.20.43.101 394 <134>1 2021-09-30T22:40:31+05:30 canopus.domain1.com2 PulseSecure: - - - id=firewall time="2021-09-30 22:40:31" pri=6 fw=xx2.20.43.101 vpn=ive user=user3 realm="google_auth" roles="Domain_check_role" proto= src=1xx.168.77.166 dst= dstname= type=vpn op= arg="" result= sent= rcvd= agent="" duration= msg="NWC23508: Key Exchange number 1 occurred for user with NCIP xx2.20.214.109 "

Below is my config file-

input {
        syslog {
                port => 1301
                ecs_compatibility => disabled
                tags => ["vpn"]
        }
}

I tried to apply a condition first to get VPN logs (1st sample logline) and pass it to dissect-

filter {
        if "vpn" in [tags] {
                #if ([message] =~ /vpn=ive/) {
                if "vpn=ive" in [message] {
                        dissect {
                                mapping => { "message" => "%{reserved} id=firewall %{message1}" }
                                # using id=firewall to get KV pairs in message1 
                        }
                }
        }
        else { drop {}  }
# \/ end of filter brace
}

But when I run with this config file, I am getting mixture of all 5 types of logs in kibana. I don't see any dissect failures as well.
Another question is, if I have to process all 5 types of logs in one config file, will below be a good approach?

if "VPN-logline" in [message] { use KV plugin and add tag of "vpn" }
else if "DNS-logline" in [message] { use JSON plugin and tag of "dns"}
else if "something-irrelevant" in [message] { drop {} }

That is exactly what I would expect. All of the lines will have a "vpn" tag, so they will go through the if branch, not the drop. None of the messages contain "vpn-ive" so none will go through the dissect.

Did you mean to test "vpn=ive" in [message]?

Yes @Badger . I meant to check if "vpn=ive" in message only. I assume when the logline is received, if condition will check whether "vpn=ive" is present in the message or not, and then only apply the dissect plugin.
I tried changing this to "mumvpn.domain1.com" (for 1st logline) but still no luck.
Can there be any conditional matching done in input section also? e.g.- if "vpn=ive" is present in the message.

You can't have conditionals in your input, only in the filter block or in the output block.

What is the conditional you are trying?

The config you shared you are testing with if "vpn-ive" in [message], but the substring vpn-ive does not exist in any of the examples messages you shared, what you have is vpn=ive.

Have you tried with if "vpn=ive" in [message]?

Also, you are using the syslog input, this input will apply some parsing to your messages, so the message field in the filter and output block does not have the same value as the source device.

For example, considering your first example message.

<134>Sep 30 22:02:27 mumvpn.domain1.com PulseSecure: id=firewall time="2021-09-30 22:02:27" pri=6 fw=xx2.21.140.100 vpn=ive user=user1 realm="google_auth" roles="" proto=auth src=1xx.87.92.246 dst= dstname= type=vpn op= arg="" result= sent= rcvd= agent="" duration= msg="AUT24803: Host Checker policy 'Hostchecker_Policy' passed on host '1xx.87.92.246' address '7a-fc-ac-08-88-9e'  for user 'user1'."

The syslog input will use the following grok pattern to parse this message:

<%{POSINT:priority}>%{SYSLOGLINE}

When your event enters the filter block, it will have the following fields:

{
  "program": "PulseSecure",
  "priority": "134",
  "logsource": "mumvpn.domain1.com",
  "message": "id=firewall time=\"2021-09-30 22:02:27\" pri=6 fw=xx2.21.140.100 vpn=ive user=user1 realm=\"google_auth\" roles=\"\" proto=auth src=1xx.87.92.246 dst= dstname= type=vpn op= arg=\"\" result= sent= rcvd= agent=\"\" duration= msg=\"AUT24803: Host Checker policy 'Hostchecker_Policy' passed on host '1xx.87.92.246' address '7a-fc-ac-08-88-9e'  for user 'user1'.\"",
  "timestamp": "Sep 30 22:02:27"
}

So, if you had the conditional if "mumvpn.domain1.com" in [message], it would not work because your message is now different and this substring does not exist.

When using the syslog input you can't build your groks based on the source message from your syslog devices, you need to build it based on the message field created by the input.

Share the conditional you are trying to use, the message that should've match it and the json document from Elasticsearch of this same message that didn't match.

Another thing is, your else { drop {} } condition will never match, you are adding the vpn tag in your input, so every event will have this tag and nothing will enter the else block.

Unless you have different inputs in the same configuration, you do not need to have this first if conditional as it will always be true.

Hi @leandrojmp , I made slight change in the configuration and started getting desired logging.
Below is my new config-

input {
	syslog {
		port => 1301
		ecs_compatibility => disabled 
#		tags => ["vpn"]
	}
}
filter {
	#if "vpn" in [tags] {
		if "vpn=ive" in [message] {
			dissect {
				mapping => { "message" => "%{reserved} id=firewall %{message1}" }
			}
		}
	else { drop {}  }
	#}
}
output {
#	if "vpn" in [tags] {
		elasticsearch {
			hosts => "localhost"
			index => "vpn1oct"			
			user => "elastic"
			password => "xxxxxxxxxx"
		}
#	}
	stdout { }
}

I had to remove tags from input section. But this will create problems in production since I'll have to either merge this config with another master config running for firewalls (which has tags identification) or I'll have to keep this as a separate config file in conf.d directory.
Not sure if I can use another config file in conf.d without tags in input section.
Also, how can I stop logstash from auto parsing syslog messages? I'll rather use dissect for this conveniently. Will ecs_compatibility => disabled help in this?

Many thanks...

You can change the syslog input and use the tcp, udp input or both, depending on which protocol your syslog servers are sending.

This way you will be able to build your grok based on the source original message.

You can just change your input to the following:

input {
    tcp {
        port => "1301"
    }
    udp {
        port => "1301"
    }
}

The syslog input already listens on tcp and udp.

Thanks @leandrojmp , I am able to get raw syslogs now. One help is needed again.
I am using KV plugin after dissecting the message. The bigger value in message gets parsed using KV plugin.
I get a field called msg (out of message1) and it contains a long text. I've boxed the msg field in blue color in the screenshot. If msg field's value contains only opening and closing double-quotes, it's value gets correctly picked and gets populated in the msg field towards it's right. ( text starting with AUT24414: ....... ) . Refer the first logline.

But (refer 2nd logline) if the field msg has another double-quotes enclosed stiring inside a string (underlined in RED), it doesn't get parsed correctly. Only the first word gets picked and rest others are dropped.
I need the entire value contained in msg field's value to be parsed.
I've tried many combinations of gsub but no success. Replacing : with blank also doesn't help.
The actual message for the logline is in message1 only which has to be parsed by KV plugin.
Config is below-

input {
        tcp {
                port => 1301
        }
}
filter {
                if "type=vpn" in [message] {
                        dissect {
                                mapping => { "message" => "%{reserved} id=firewall %{message1}" }
                        }
                        #mutate { gsub => ["message1",':'," "] }
                        #mutate { gsub => ["message1",'"',''] }
                        kv { source => "message1"  value_split => "=" whitespace => "strict" }   #field_split => " "  }                         
                        geoip { source => "src"  }
                # \/ end of if vpn type log
                }
        else { drop {}  }
}
output {
                elasticsearch {
                        hosts => "localhost"
                        index => "vpn1oct"
                        user => "elastic"
                        password => "xxxxxxxx"
                }
        stdout { }
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.