Veeam Logstash Grok

Hello,

I'm trying to grok logs from Veeam logs > filebeat file stream > logstash to create a better filter on logs collected from Veeam. I'm relatively new when it comes to grok and I'm stuck on how I would go about groking the log message below. Could one of you submit one of your suggestions on how to grok this log format from Veeam log files?

	[08.08.2022 13:18:32] <17> Error           at Veeam.Backup.ProxyProvider.CProxyRpcInvoker.Call(String methodName, CProxyInvokeInputArg inputArgs, Boolean secureCall)

What fields do you want to extract from the log message?

Hello,

Thanks for your reply! I look forward to you educating me on this. I want to extract.

Date: [08.08.2022 13:18:32]
ID: <17>
Log level: Error

I wrote the first part of the grok for the timestamp the field now shows up in elasticsearch and kibana. Once I have the filebeat.yml and logstash pipeline running how it should I'll post a thread with the details to share back.

    if "veeam" in [tags] {
grok {
     match => { "message" => "%{DATESTAMP:timestamp}"}
        }
    }
}

I would use dissect for that rather than grok

dissect { mapping => { "message" => "[%{date}] <%{id}> %{level} %{}" } }

But if you insist on using grok then

grok { match => { "message" => "^(?<date>[\[\]\d.: ]+) %{NOTSPACE:id} %{WORD:level}" } }

Awesome, I didn't even think to use dissect. I'll try both and post the result!

That did the trick thank you very much Badger! I reckon how do I create a field for the sting after the log level?

[09.08.2022 16:40:17] <104> Info         ==  Advanced: <AutoScheduleOptions PerformActiveFullBackup="False" ActiveFullBackupKind="0" ActiveFullBackupDays="" PerformTransformToSyntethic="False" TransformToSyntethicDays=""><ActiveFullBackupMonthlyOption><FullBackupMonthlyScheduleOptions /></ActiveFullBackupMonthlyOption></AutoScheduleOptions>

That final %{} matches the rest of the line, but does not save it as a field. You could change that to

dissect { mapping => { "message" => "[%{date}] <%{id}> %{level} %{someField}" } }

I want to give back to the community as Badger supported me. I have included my filebeat.yml file from my Veeam Backup and Replication server. I have also included the logstash pipeline configuration file. I expect this will help, someone, in the future with collecting logs from Veeam. I'll write something up later on and post a thread.

filebeat.yml includes regex to only extract log messages from log files.

###################### Filebeat Configuration Example #########################

# This file is an example configuration file highlighting only the most common
# options. The filebeat.reference.yml file from the same directory contains all the
# supported options with more comments. You can use it as a reference.
#
# You can find the full configuration reference here:
# https://www.elastic.co/guide/en/beats/filebeat/index.html

# For more available modules and options, please see the filebeat.reference.yml sample
# configuration file.

# ============================== Filebeat inputs ===============================

filebeat.inputs:

# Each - is an input. Most options can be set at the input level, so
# you can use different inputs for various configurations.
# Below are the input specific configurations.

# filestream is an input for collecting log messages from files.
- type: filestream

  # Unique ID among all inputs, an ID is required.
  id: veeam

  # Change to true to enable this input configuration.
  enabled: true

  # Paths that should be crawled and fetched. Glob based paths.
  paths:
    - C:\ProgramData\Veeam\Backup\Utils\DeleteBackup\*.log
    - C:\ProgramData\Veeam\Backup\Svc.VeeamBackup*.log

  # Exclude lines. A list of regular expressions to match. It drops the lines that are
  # matching any regular expression from the list.
  #exclude_lines: ['^DBG']

  # Include lines. A list of regular expressions to match. It exports the lines that are
  # matching any regular expression from the list.
  include_lines: ['\[\d+.\d+.\d+\s\d+\:\d+:\d+]\s\<\d+\>\sWarning', '\[\d+.\d+.\d+\s\d+\:\d+:\d+]\s\<\d+\>\sError', '\[\d+.\d+.\d+\s\d+\:\d+:\d+]\s\<\d+\>\sInfo']

  # Exclude files. A list of regular expressions to match. Filebeat drops the files that
  # are matching any regular expression from the list. By default, no files are dropped.
  #prospector.scanner.exclude_files: ['.gz$']

  # Optional additional fields. These fields can be freely picked
  # to add additional information to the crawled log files for filtering
  #fields:
  #  level: debug
  #  review: 1

# ============================== Filebeat modules ==============================

filebeat.config.modules:
  # Glob pattern for configuration loading
  path: ${path.config}/modules.d/*.yml

  # Set to true to enable config reloading
  reload.enabled: false

  # Period on which files under path should be checked for changes
  #reload.period: 10s


# ================================== General ===================================

# The name of the shipper that publishes the network data. It can be used to group
# all the transactions sent by a single shipper in the web interface.
#name:

# The tags of the shipper are included in their own field with each
# transaction published.
tags: ["veeam"]

# Optional fields that you can specify to add additional information to the
# output.
#fields:
#  env: staging


# ================================== Outputs ===================================


# ------------------------------ Logstash Output -------------------------------
output.logstash:
  # The Logstash hosts
  hosts: [":5044", ":5044", ":5044"]
  loadbalance: true
  worker: 3

  # Optional SSL. By default is off.
  # List of root certificates for HTTPS server verifications
  ssl.certificate_authorities: C:\ProgramData\

  # Certificate for SSL client authentication
  ssl.certificate: C:\ProgramData\

  # Client Certificate Key
  ssl.key: C:\ProgramData\Elastic\
  
  ssl.verification_mode: certificate

# ================================== Logging ===================================

# Sets log level. The default log level is info.
# Available log levels are: error, warning, info, debug
#logging.level: debug

# At debug level, you can selectively enable logging only for some components.
# To enable all selectors use ["*"]. Examples of other selectors are "beat",
# "publisher", "service".
#logging.selectors: ["*"]
input {
  beats {
    port => 5044
    ssl => true
    ssl_certificate_authorities => ["/etc/root-ca.pem"]
    ssl_certificate => "/etc/client.pem"
    ssl_key => "/etc/client-key.pem"
  }
}

filter {
    if "iis" in [tags] {
grok {
     match => { "message" => "%{TIMESTAMP_ISO8601:log_timestamp} %{IPORHOST:S-IP} %{WORD:CS-Method} %{URIPATH:CS-URI-Stem} %{NOTSPACE:cs-uri-query} %{NUMBER:S-Port} %{NOTSPACE:CS-Username} %{IPORHOST:C-IP} %{NOTSPACE:CS-UserAgent} %{NOTSPACE:CS-Referer} %{NUMBER:SC-Status} %{NUMBER:SC-SubStatus} %{NUMBER:SC-Win32-Status} %{NUMBER:Time-Taken}"}
        }
    }
    if "veeam" in [tags] {
dissect  {
     mapping => { "message" => "[%{date}] <%{id}> %{level} %{event}" }
        }
    }
}

output {
  elasticsearch{
    hosts => 
    index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
  }
}

When I add this to my pipeline logstash doesn't extract data from the log.

Could it be a tab after the log level? If so, you would need a literal tab in the dissect expression, or else

mutate { gsub => [ "message", "\t", " " ] }

Thanks again. Adding the tab after the log level didn't work. I need some clarification do I add the mutate then the %{someField}" } to the dissect string?

#First
mutate { gsub => [ "message", "\t", " " ] }

#Second
dissect { mapping => { "message" => "[%{date}] <%{id}> %{level} %{someField}" } }

I'm pretty sure after the %{level} it's a space because my filebeat.yml contains include_lines: ['[\d+.\d+.\d+\s\d+:\d+:\d+]\s<\d+>\sWarning',

Yes, the mutate+gsub should be before the dissect. Because dissect needs the separators to exactly match and it does not support regexps like grok's %{SPACE} I sometimes find it useful to collapse all whitespace down to a single space

mutate { gsub => [ "message", "\s+", " " ] }

Badger, you are quite skilled that did the trick the white space is removed from the messages field. message [10.08.2022 14:33:16] <44> Info [RTS] Rescheduled tasks count statistics: [Completed: 0], [Pending: 0], [Preparing: 0], [Ready: 0], [ResponseGiven: 0], [WaitingForComplete: 0]

I'm now running into a parsing exception when I use the filter.
Error:

status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"object mapping for [event] tried to parse field [event] as object, but found a concrete value"}}}}

Filter:

    if "veeam" in [tags] {
mutate { 
     gsub => [ "message", "\s+", " " ] }
dissect  {
     mapping => { "message" => "[%{date}] <%{id}> %{level} %{event}"} 
        }
    }
}

Read this thread to understand what that error is telling you.

You most likely indexed some documents that contain [event][original] (so [event] is an object) since with ECS compatability enabled logstash stores a copy of the [message] field there.

Quickest fix will be to use a different fieldname in your dissect.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.