Filter by event.code and divide message field

Hello!

I have logstash 7.1 and winlogbeat 7.1. Im trying to get events 4624 from domain controllers, divide message filed into multiple field and then remove everything i don't need.

I have 2 problems there:

  1. I've started by trying to just remove message field from 4624 events, but couldn't even do that. If i use filter like that

    if "windc" in [tags] and [event.code] == 4624 {
        mutate {
        remove_field => ["message"]
        }
     }
    

The message field is not deleted. Just nothing happens. I've tried many different variations of IF and AND like

[event.code] == 4624
"event.code" == 4624
"4624" in "[event][code]" 
[event.code] = 4624
"[event][code]" == 4624

but no one worked for me. But if i try to remove that field with

if "windc" in [tags]  {
        mutate {
        prune {
       remove_field => ["message","[event][code]"]
 }

everything working as it should. What i do wrong?

  1. Is it possible to break this field into small fields one line at a time or something similar?
  1. You tried everything but the correct syntax, I guess:
    if "windc" in [tags] and [event][code] == 4624 { ...
  2. I don't understand the question. What is the content of the field and what should the result look like?
1 Like

Well, I tried probably everything, or 99% of the possible options, but really missed yours. Thanks, it works!

There is many information that i dont need in that log, like

Subject:
	Security ID:		S-1-0-0
	Account Name:		-
	Account Domain:		-
	Logon ID:		0x0

Logon Information:
	Logon Type:		3
	Restricted Admin Mode:	-
	Virtual Account:		No
	Elevated Token:		Yes

Impersonation Level:		Impersonation
Process Information:
	Process ID:		0x0
	Process Name:		-

Detailed Authentication Information:
	Logon Process:		Kerberos
	Authentication Package:	Kerberos
	Transited Services:	-
	Package Name (NTLM only):	-
	Key Length:		0

I need only Account Name, Source Network Address and maybe Logon Type. But i dont know how i can remove everything that i dont need.

I saw one solution for same question - use grok for message field. But im afraid that pattern will be too big.

Here's a ruby code suggestion for the data extraction that tries to parse your message (in a very simple way) and then decides which fields to save. Someone else might have a more elegant solution.

#parsing data
message_content = Hash.new()
str = event.get('message')
fieldname = nil
str.each_line do |line|
  next if line =~ /^$/ #empty line
  if !(line =~ /^\t/) then #line without indentation = root level
      fieldname = line.scan(/^(.*):/).first.first #the field name is the text before the ':'
      message_content[fieldname] = line.scan(/:\t(.*)$/).first.first if !(line =~ /:$/) #root level element with a value instead of children, so the value is behind the ':'
  elsif !fieldname.nil? #child element
    message_content[fieldname] = Hash.new if message_content[fieldname].nil? #create hash if it doesn't exist
    message_content[fieldname][line.scan(/^\t*(.*?):/).first.first] = line.scan(/:\t*(.*)$/).first.first #child element field name and value are before and after the ':'
  end
end

#collecting interesting fields
wanted_fields = Hash.new();
wanted_fields['account_name'] = message_content['Subject']['Account Name'] if defined?message_content['Subject']['Account Name']
wanted_fields['network_address'] = message_content['Whatever']['Source Network Address'] if defined?message_content['Whatever']['Source Network Address']
wanted_fields['logon_type'] = message_content['Logon Information']['Logon Type'] if defined?message_content['Logon Information']['Logon Type']
event.set('wanted_fields', wanted_fields)
1 Like

I think i found solution - https://www.elastic.co/guide/en/logstash/current/plugins-filters-split.html. I just need to join multiple documents to single, without strings that i dont need.

Thanks. Looks pretty good too. Ill try both options. If you know how i can rejoin documents after split plugin, but only with fields that i need, I would really appreciate it.

Re-joining events is not a trivial problem at all, so I would avoid splitting the event up if you can help it.

@Jenni's code looks like a fantastic start. I would recommend using the parser bit to place the entire parsed result into a @metadata field (which is a part of the event but not typically included in outputs), and then using Logstash's mutate filter to extract the fields that you need.

e.g.,

filter {
  ruby {
    code => "
      message_content = Hash.new()
      str = event.get('message')
      fieldname = nil
      str.each_line do |line|
        next if line =~ /^$/ #empty line
        if !(line =~ /^\t/) then #line without indentation = root level
            fieldname = line.scan(/^(.*):/).first.first #the field name is the text before the ':'
            message_content[fieldname] = line.scan(/:\t(.*)$/).first.first if !(line =~ /:$/) #root level element with a value instead of children, so the value is behind the ':'
        elsif !fieldname.nil? #child element
          message_content[fieldname] = Hash.new if message_content[fieldname].nil? #create hash if it doesn't exist
          message_content[fieldname][line.scan(/^\t*(.*?):/).first.first] = line.scan(/:\t*(.*)$/).first.first #child element field name and value are before and after the ':'
        end
      end
      event.set('[@metadata][parsed]', message_content)
    "
  }

  mutate {
    copy => {
      "[@metadata][parsed][Subject][Account Name]"         => "[account_name]"
      "[@metadata][parsed][Logon Information][Logon Type]" => "[logon_type]"
    }
  }
}

2 Likes

Thank you very much for your reply. But when i use your filter i got error:

[2019-12-13T15:59:42,589][ERROR][logstash.filters.ruby ] Ruby exception occurred: undefined method `first' for nil:NilClass

I've deleted everything from filter secion, except your code.

What did the file look like that should have been parsed? I kept my example very close to your example to save time instead of creating a recursive function for more levels of data or thinking of potential exceptions in the formatting. So maybe your full data contains something that cannot be matched with this simple code. If the regex doesn't match, but I just assumed that it would, this error might occur.

Colleagues, I have to admit, it was an unfortunate mistake.
By default, winlogbeat already splits the message field into the desired fields. I didn't notice this because we had previously specifically introduced a filter that removed these new fields and left only the message field. This is convenient for all other types of Windows logs, but not for 4626, 4776 . I'm ashamed.
But I am sure that your code can be useful in other templates, it will definitely be useful to a large number of people. Thank you again for your help.

BTW i want to share my solution to you:

I've added this to winlogbeat:

processors:
  - add_host_metadata: ~
  - add_cloud_metadata: ~
  - drop_fields:
      fields: [winlog.event_data.ElevatedToken, winlog.event_data.ImpersonationLevel, winlog.event_data.KeyLength, winlog.event_data.LmPackageName, winlog.event_data.LogonGuid, winlog.event_data.RestrictedAdminMode, winlog.event_data.TargetLinkedLogonId, winlog.provider_guid, winlog.process.pid, winlog.event_data.TargetLogonId, winlog.event_data.TargetOutboundDomainName, winlog.event_data.TargetOutboundUserName, winlog.event_data.TargetUserSid, winlog.event_data.TransmittedServices, winlog.event_data.VirtualAccount, winlog.keywords, winlog.opcode, winlog.provider_name, winlog.record_id, winlog.process.thread.id, winlog.version]

and this to filter in logstash:

# Domain Controllers logs filtering

    if "windc" in [tags] and [event][code] == 4776 and [winlog][event_data][Status] == "0x0" {
        drop {}

  }

    if "windc" in [tags] and [event][code] == 4776 {
        prune {
        remove_field => ["[agent][id]","[agent][ephemeral_id]","[agent][hostname]","[agent][type]","[agent][version]","[ecs][version]","[event][kind]","[host][architecture]","[host][hostname]","[host][os][build]","[host][os][family]","[host][os][kernel]","[host][os][platform]","[host][os][version]"]
        blacklist_names => ["^.*winlog.*"]

        }
  }

# Domain Controllers logon events

    if "windc" in [tags] and [event][code] == 4624 and [winlog][event_data][TargetUserName] == "admin1 or [winlog][event_data][TargetUserName] == "admin2"  {
        drop {}

  }




    if "windc" in [tags] and [event][code] == 4624 {
        prune {
        remove_field => ["[agent][id]","[agent][ephemeral_id]","[agent][hostname]","[agent][type]","[agent][version]","[ecs][version]","[event][kind]","[host][architecture]","[host][hostname]","[host][os][build]","[host][os][family]","[host][os][kernel]","[host][os][platform]","[host][os][version]"]
        blacklist_names => ["message"]
        }
  }

Some field, which we never need, could be deleted via ingest pipeline:

  "remove-fields" : {
    "description" : "remove a exchange of fields",
    "processors" : [
      {
        "remove" : {
          "field" : [
            "agent.ephemeral_id",
            "agent.hostname",
            "agent.id",
            "agent.type",
            "agent.version",
            "ecs.version",
            "input.type",
            "log.offset",
            "log.file.path",
            "version"
          ],
          "ignore_failure" : true
        }
      }
    ]
  }

It looks bulky, a bit, but works ok. Some filtering in logstash could be done via whitelist, but I couldn't make it work.