Filtering on external syslog message.keyword for SRC=aaa.bbb.ccc.ddd IP address

I am new to the list. Hopefully I am in the right thread location.
I have a external remote syslog being ported to an internal Logstash syslog input and sent to Elasticsearch engine. Typically the message contains the day, time and message in the discovery display.

It is the content of the message that I would like to have visibility. Within that message I get the remote syslog server name, MAC, Interface, SRC= IP address and DST=IP address. I would like to filter on the SRC= aaa.bbb.ccc.ddd metrics. A geoip traceability would be nice for a dashboard.

Long ago I did look at this in light of regular expression filtering but have lost that information. I am sure things have changed since then.

Any help or direction would be appreciated.
Very Best Regards

Welcome to the forums!

Do you have an example of anything you have tried so far (configs)?
Any example logs or json inputs right now and what you'd like the output to look like?
It helps if you post details like that otherwise this could be done 1000 ways.

Apologies for the delay.

Here is my, very dated, logstash filter file. I see that it is setup basically for ssh metrics:
filter {
if [fileset][module] == "system" {
if [fileset][name] == "auth" {
grok {
match => { "message" => ["%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:[%{POSINT:[system][auth][pid]}])?: %{DATA:[system][auth][ssh][event]} %{DATA:[system][auth][ssh][method]} for (invalid user )?%{DATA:[system][auth][user]} from %{IPORHOST:[system][auth][ssh][ip]} port %{NUMBER:[system][auth][ssh][port]} ssh2(: %{GREEDYDATA:[system][auth][ssh][signature]})?",
"%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:[%{POSINT:[system][auth][pid]}])?: %{DATA:[system][auth][ssh][event]} user %{DATA:[system][auth][user]} from %{IPORHOST:[system][auth][ssh][ip]}",
"%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:[%{POSINT:[system][auth][pid]}])?: Did not receive identification string from %{IPORHOST:[system][auth][ssh][dropped_ip]}",
"%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sudo(?:[%{POSINT:[system][auth][pid]}])?: \s*%{DATA:[system][auth][user]} :frowning: %{DATA:[system][auth][sudo][error]} ;)? TTY=%{DATA:[system][auth][sudo][tty]} ; PWD=%{DATA:[system][auth][sudo][pwd]} ; USER=%{DATA:[system][auth][sudo][user]} ; COMMAND=%{GREEDYDATA:[system][auth][sudo][command]}",
"%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} groupadd(?:[%{POSINT:[system][auth][pid]}])?: new group: name=%{DATA:system.auth.groupadd.name}, GID=%{NUMBER:system.auth.groupadd.gid}",
"%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} useradd(?:[%{POSINT:[system][auth][pid]}])?: new user: name=%{DATA:[system][auth][user][add][name]}, UID=%{NUMBER:[system][auth][user][add][uid]}, GID=%{NUMBER:[system][auth][user][add][gid]}, home=%{DATA:[system][auth][user][add][home]}, shell=%{DATA:[system][auth][user][add][shell]}$",
"%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} %{DATA:[system][auth][program]}(?:[%{POSINT:[system][auth][pid]}])?: %{GREEDYMULTILINE:[system][auth][message]}"] }
pattern_definitions => {
"GREEDYMULTILINE"=> "(.|\n)"
}
remove_field => "message"
}
date {
match => [ "[system][auth][timestamp]", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
geoip {
source => "[system][auth][ssh][ip]"
target => "[system][auth][ssh][geoip]"
}
}
else if [fileset][name] == "syslog" {
grok {
match => { "message" => ["%{SYSLOGTIMESTAMP:[system][syslog][timestamp]} %{SYSLOGHOST:[system][syslog][hostname]} %{DATA:[system][syslog][program]}(?:[%{POSINT:[system][syslog][pid]}])?: %{GREEDYMULTILINE:[system][syslog][message]}"] }
pattern_definitions => { "GREEDYMULTILINE" => "(.|\n)
" }
remove_field => "message"
}
date {
match => [ "[system][syslog][timestamp]", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
}
}
}

A typical kibana discovery line would be like the one below. I would like to be able to filter on internal message content for the dropped source IP's (e.g. SRC=185.156.73.60) and see how many drops and locations over a period of time. Of course, source and destination ports would be an interest as well. Hopefully I am providing the proper information.

Here are a few message examples. I would like to filter on SRC=, SPT=, DPT=:

|Nov 6, 2021 @ 04:11:49.264|Nov 6 03:11:45 OUTER-ROUTER kernel: DROP IN=eth0 OUT= MAC=00:00:00:00:00 SRC=89.248.165.203 DST=aaa.bbb.ccc.ddd LEN=40 TOS=0x00 PREC=0x00 TTL=236 ID=9500 PROTO=TCP SPT=46285 DPT=10030 SEQ=2913663473 ACK=0 WINDOW=1024 RES=0x00 SYN URGP=0 |

|Nov 6, 2021 @ 04:11:42.262|Nov 6 03:11:41 OUTER-ROUTER kernel: DROP IN=eth0 OUT= MAC=00:00:00:00:00 SRC=89.248.165.53 DST=aaa.bbb.ccc.ddd LEN=40 TOS=0x00 PREC=0x00 TTL=236 ID=32581 PROTO=TCP SPT=44022 DPT=3539 SEQ=433883477 ACK=0 WINDOW=1024 RES=0x00 SYN URGP=0 |

Nov 6, 2021 @ 04:11:41.261|Nov 6 03:11:40 OUTER-ROUTER kernel: DROP IN=eth0 OUT= MAC=00:00:00:00:00 SRC=185.156.73.60 DST=aaa.bbb.ccc.ddd LEN=40 TOS=0x00 PREC=0x00 TTL=237 ID=35192 PROTO=TCP SPT=48759 DPT=50541 SEQ=2162472918 ACK=0 WINDOW=1024 RES=0x00 SYN URGP=|

|Nov 6, 2021 @ 04:11:41.261|Nov 6 03:11:40 OUTER-ROUTER kernel: DROP IN=eth0 OUT= MAC=00:00:00:00:00 SRC=185.156.73.60 DST=aaa.bbb.ccc.ddd LEN=40 TOS=0x00 PREC=0x00 TTL=237 ID=33070 PROTO=TCP SPT=48759 DPT=41132 SEQ=1411045953 ACK=0 WINDOW=1024 RES=0x00 SYN URGP=0 |

Things are starting to come back. Back in 2017, when my first interest in ELK started, someone helped me with a REGEX filter while in the Kibana window. If my memory serves me, it was a _type filter that had the ability to parse each message and index the IP dotted-quad address. Alas, I lost my notes since then and am trying to recreate the same indexing 4 years later. Anyone with a working process?