Logstash 8.9.0: How to match messages containing words starting with "sap" (e.g., sapxxx) using a filter?

Could you please help me with a Logstash filter pipeline configuration that can catch messages of the following type:

<30>Dec  5 15:01:05 saphcmdevdb systemd[1]: snapperd.service: Deactivated successfully.

I need to detect any message that contains a word starting with “sap”, for example: sapxxx, sapxxx01, etc.
The word must start with “sap”.
So patterns like xxsapxx or xxxsap should not match.
Only sap* (letters or digits after “sap”) is valid.

I have tried many different regex options in Logstash but none of them worked correctly.

Logstash version: 8.9.0

Welcome to the community @Pan_Vad !

Maybe:

filter {
  if [message] =~ /\bsap\w*/ {
    # your stuff here 
  }
}

\b is a word boundary
sap
\w* is rest of word, includes letters and digits and underscores

EDIT: Maybe it's not your regex that is the problem, maybe something else. It is always helpful to share as much information as possible.

Unfortunately, it doesn’t work. I’ve already tried this option.

OK, I tried it with same logstash version as you:

$ fgrep "logstash.version" logstash-plain.log
[2025-12-05T17:15:10,338][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"8.9.0", "jruby.version"=>"jruby 9.3.10.0 (2.6.8) 2023-02-01 107b2e6697 OpenJDK 64-Bit Server VM 17.0.7+7 on 17.0.7+7 +indy +jit [x86_64-linux]"}
$ cat /etc/logstash/conf.d/simple.conf
input {
  file {
    path => "/tmp/input.log"
    start_position => "beginning"
    sincedb_path => "/dev/null"
  }
}

filter {
  if [message] =~ /\bsap\w*/ {
    mutate { add_tag => ["contains_sap"] }
  }
}

output {
  file {
    path => "/tmp/output.log"
  }
}

is my config

$ cat >> /tmp/input.log
<30>Dec  5 15:01:05 saphcmdevdb systemd[1]: snapperd.service: Deactivated successfully.
$

and I got in output file

$ cat /tmp/output.log
{"event":{"original":"<30>Dec  5 15:01:05 saphcmdevdb systemd[1]: snapperd.service: Deactivated successfully."},"@version":"1","@timestamp":"2025-12-05T16:16:12.001372205Z","host":{"name":"u2024"},"message":"<30>Dec  5 15:01:05 saphcmdevdb systemd[1]: snapperd.service: Deactivated successfully.","tags":["contains_sap"],"log":{"file":{"path":"/tmp/input.log"}}}
$ jq . /tmp/output.log
{
  "event": {
    "original": "<30>Dec  5 15:01:05 saphcmdevdb systemd[1]: snapperd.service: Deactivated successfully."
  },
  "@version": "1",
  "@timestamp": "2025-12-05T16:16:12.001372205Z",
  "host": {
    "name": "u2024"
  },
  "message": "<30>Dec  5 15:01:05 saphcmdevdb systemd[1]: snapperd.service: Deactivated successfully.",
  "tags": [
    "contains_sap"
  ],
  "log": {
    "file": {
      "path": "/tmp/input.log"
    }
  }
}

So, as I noted already, maybe it's not your regex that is the problem, maybe something else? It is always helpful to share as much information as possible.

You can use something like this:


input {
  generator {
       message => '<30>Dec  5 15:01:05 saphcmdevdb systemd[1]: snapperd.service: Deactivated successfully.'
       count => 1
  }
}
filter{

grok {
    match => {"message" => "<%{INT}>%{SYSLOGLINE}" }
	overwrite => [ "message" ]
  }
   
	 if [host][hostname] =~ /^sap/ { # add (?i) in front sap, for case-insensitive search
		 mutate{ add_tag => [ "It_works" ] } 
	 }
 
    mutate{
	remove_field => [ "event",  "@version", "process", "@timestamp"]  # "message"
	}

}
output {
    stdout { codec => rubydebug{ }} 
}

Output:

{
      "message" => "snapperd.service: Deactivated successfully.",
    "timestamp" => "Dec  5 15:01:05",
         "host" => {
            "name" => "vipera",
        "hostname" => "saphcmdevdb"
    },
         "tags" => [
        [0] "It_works"
    ]
}

This approach doesn’t work for me because I only have one input and one output, but many filters. In the filters, I intercept logs using conditions like if [host][ip] == "10.0.0.0" or [host][ip] == "192.168.0.0" or [host][ip] == "172.0.0.0" or …. The list is very large. Each device or application has its own filter. But this is inconvenient because every time a new host is added, I have to update the corresponding filter.

The filter with the largest number of if [host][ip] conditions is the one where I want to use matching based on the beginning of the word “sap”, because all hosts start with “sap…”. This will simplify adding new hosts without modifying the pipeline.

For reference, here is what the input pipeline looks like:

input {
  tcp {
    type => "syslog"
    port => 1514
  }
}

input {
  udp {
    type => "syslog"
    port => 2514
  }
}

And here is the output pipeline:

output {
  if [type] == "syslog" {
    if "gisap" in [tags] {
      elasticsearch {
        hosts => ["https://elastic01.avgust.com:9200", "https://elastic02.avgust.com:9200", "https://elastic03.avgust.com:9200", "https://elastic04.avgust.com:9200"]
        index => "gisap-syslog-%{+dd.MM.YYYY}"
        ssl => true
        ssl_certificate_verification => true
        cacert => "/etc/logstash/root.pem"
        user => "xxx"
        password => "xxx"
      }
    }
    else if "gtsr" in [tags] {
      elasticsearch {
        hosts => ["https://elastic01.avgust.com:9200", "https://elastic02.avgust.com:9200", "https://elastic03.avgust.com:9200", "https://elastic04.avgust.com:9200"]
        index => "gtsr-syslog-%{+dd.MM.YYYY}"
        ssl => true
        ssl_certificate_verification => true
        cacert => "/etc/logstash/root.pem"
        user => "xxx"
        password => "xxx"
      }
    }
    else {
      elasticsearch {
        hosts => ["https://elastic01.avgust.com:9200", "https://elastic02.avgust.com:9200", "https://elastic03.avgust.com:9200", "https://elastic04.avgust.com:9200"]
        index => "logs-%{+dd.MM.YYYY}"
        ssl => true
        ssl_certificate_verification => true
        cacert => "/etc/logstash/root.pem"
        user => "xxx"
        password => "xxx"
      }
    }
  }
}

I have tried many options, for example:

filter {
  if [message] =~ /(?i)\bsap[a-z0-9]+\b/ {
    mutate { add_tag => ["gisap"] }
  }
}

Please help me solve this problem.

Is it sap* always located after timespamp?

Yes, always. All messages are in standard syslog format. Structure:

"<PRI> <time> <host name> <program>: message text..."

RainTown suggestion is working fine, it will search for sap string at any possition, check it.
Here is the sample which use grok to the syslog message, if you don't want field, remove it or use @metadata as temporary fields. Only difference is with grok will have sap checking on the right position.

input {
  generator {
       #message => '<30>Dec  5 15:01:05 192.168.0.0 systemd[1]: snapperd.service: Deactivated successfully.' # hostname is ip 
       #message => '<30>Dec  5 15:01:05 aphcmdevdb systemd[1]: snapperd.service: Deactivated successfully.'  # hostname doesn't have sap*
       #message => '<30>Dec  5 15:01:05 aaphcmdevdb systemd[1]: sapperd.service: Deactivated successfully.'  # line has sap* on another place
       message => '<30>Dec  5 15:01:05 saphcmdevdb systemd[1]: snapperd.service: Deactivated successfully.'  # your sample
       count => 1
  }
}
filter{

 # parse syslog line in @metadata or realfields
  grok {
    match => {"message" => "<%{INT}>(?:%{SYSLOGTIMESTAMP:[@metadata][timestamp]}|%{TIMESTAMP_ISO8601:[@metadata][timestamp]})(?: %{SYSLOGFACILITY})?(?: %{HOSTNAME:[@metadata][hostname]})?(?: %{SYSLOGPROG}:)? %{GREEDYDATA:[@metadata][msg]}" }
	#overwrite => [ "message" ]
  }
   
	if [@metadata][hostname] =~ /^sap/ { # add (?i) in front sap, for case-insensitive search
		 mutate{add_field => { "[@metadata][saphost]" => true }}
	 }
	else {mutate{add_field => { "[@metadata][saphost]" => false }} }

    # what RainTown suggested
    if [message] =~ /\bsap\w*/ {
     mutate{add_field => { "[@metadata][sap]" => true }}
    }
	else {mutate{add_field => { "[@metadata][sap]" => false }} }
	
    mutate{
	remove_field => [ "event",  "@version",  "@timestamp", "process", "timestamp", "host"]  # "message", 
	}

}
output {
    stdout { codec => rubydebug{ metadata => true}} 
}

Of course you can even simplify grok to search only hostname, but with this will parse all line +have validation.

I cannot use Grok parsing first because it would apply to other filters. My structure is that each filter is a separate file. Each filter file contains configuration for a specific device group. For example, hosts with IPs 192.168.0.1-192.168.0.10 are Linux servers, 192.168.0.11-192.168.0.20 are Windows servers, and so on. In all filters, I use a condition like
if [host] =~ "sap*" or [host][ip] == "10.0.0.0" or [host][ip] == "10.0.0.1" or ... and so on. At the end of the filter, I add a tag field so that in Elasticsearch I can see which host group the logs came from.

Currently, my "host" field is an array that outputs both the hostname and IP. This is inconvenient. I'm trying to get the filter into the desired state. Could you please advise: if the "host" field in the index is already structured as an array, and I'm trying to apply a filter like:

filter {
if [message] =~ /\bsap\w*/ {
# your stuff here
}
}

Could this be the mistake?

Can you share the entire logstash configuration? Because it is really complicated what your configuration is doing without seeing it.

What do you mean with this? None of the examples you shared are in this way.

You said your messages are in this format:

"<PRI> <time> <host name> <program>: message text..."

So if you want to filter based on the name of the host name, than it is something really simple.

First you need to isolate the host name on a specific field, you can do that with a dissect filter, then you check the host name field to see if it starts with sap.

Something like this:

filter {
    dissect {
        match => {
            "message" => "<%{}>%{} %{}:%{}%:{} %{[@metadata][hostname]} %{}: %{}"
        }
    }

    if [@metadata][hostname] =~ /^sap/ {
        your filters
    }
    
}
2 Likes

You probably want to obfuscate your hostnames

elastic01.avgust.com

Domain Name: AVGUST.COM
Registry Domain ID: 448982_DOMAIN_COM-VRSN
Registrar WHOIS Server: whois.nic.ru
Registrar URL: http://www.nic.ru
...
Registrant Name: Joint Stock Company "Firma August"
Registrant Organization: JSC AUGUST Inc.
Registrant Street: 6, Tsandera str
Registrant City: Moscow
Registrant State/Province: Russian Federation
Registrant Postal Code: 129515
Registrant Country: RU

I can't help you due to this.