Logstash parse windows event nxlog

Hello,

I am using nxlog to send windows data at my ELK.
I would like to parse the "message" part. An example below :

    "message": [
      "2022-04-25 10:54:13 - DESKTOP-QMCS5UA - Security -  INFO - 4672 - Privilèges spéciaux attribués à la nouvelle ouverture de session.  Sujet :  ID de sécurité :  S-1-5-18  Nom du compte :  Système  Domaine du compte :  AUTORITE NT  ID d’ouverture de session :  0x3E7  Privilèges :  SeAssignPrimaryTokenPrivilege    SeTcbPrivilege    SeSecurityPrivilege    SeTakeOwnershipPrivilege    SeLoadDriverPrivilege    SeBackupPrivilege    SeRestorePrivilege    SeDebugPrivilege    SeAuditPrivilege    SeSystemEnvironmentPrivilege    SeImpersonatePrivilege    SeDelegateSessionUserImpersonatePrivilege\r"
    ],

My nxlog configuration is that :

define ROOT     C:\Program Files\nxlog
define CERTDIR  %ROOT%\cert
define CONFDIR  %ROOT%\conf
define LOGDIR   %ROOT%\data
define LOGFILE  %LOGDIR%\nxlog.log
define EVENT_SYS	1074, 41, 6006, 6008, 7036
define EVENT_SEC	4624, 4625, 4608, 4609, 4634, 4672, 4673, 4720, 4741, 4726, 4743, 4740, 4738, \
					4742, 4781, 4727, 4731, 4744, 4754, 4759, 4783, 4790, 4730, 4734, 4744, 4754, \
					4759, 4783, 4748, 4753, 4758, 4763, 4789, 4792, 4735, 4737, 4745, 4750, 4755, \
					4760, 4784, 4791, 4728, 4732, 4746, 4751, 4785, 4787, 4729, 4733, 4747, 4752, \
					4757, 4762, 4786, 4788, 4704, 4705
LogFile %LOGFILE%

Moduledir %ROOT%\modules
CacheDir  %ROOT%\data
Pidfile   %ROOT%\data\nxlog.pid
SpoolDir  %ROOT%\data

<Extension _syslog>
    Module      xm_syslog
</Extension>

<Extension _charconv>
    Module      xm_charconv
    AutodetectCharsets iso8859-2, utf-8, utf-16, utf-32
</Extension>

<Extension _exec>
    Module      xm_exec
</Extension>

<Extension _fileop>
    Module      xm_fileop

    # Check the size of our log file hourly, rotate if larger than 5MB
    <Schedule>
        Every   1 hour
        Exec    if (file_exists('%LOGFILE%') and \
                   (file_size('%LOGFILE%') >= 5M)) \
                    file_cycle('%LOGFILE%', 8);
    </Schedule>

    # Rotate our log file every week on Sunday at midnight
    <Schedule>
        When    @weekly
        Exec    if file_exists('%LOGFILE%') file_cycle('%LOGFILE%', 8);
    </Schedule>
</Extension>

# Snare compatible example configuration
# Collecting event log
 <Input in>
     Module      im_msvistalog
	 <QueryXML>
		<QueryList>
			<Query Id='0'>
				<Select Path="Security">*[System[(Level=1  or Level=2 or Level=3 or Level=4 or Level=0)]]</Select>
				<Select Path="Application">*[System[(Level=1  or Level=2 or Level=3)]]</Select>
				<Select Path="System">*[System[(Level=1  or Level=2 or Level=3 or Level=4 or Level=0)]]</Select>
			</Query>
		</QueryList>
	</QueryXML>
	<Exec>
		$Sec = "Security";
		$Sys = "System";
		$Op = "Informations";
		if ($Channel == $Sys and $SeverityValue == 2 and ($EventID NOT IN (%EVENT_SYS%))) drop();
		if ($Channel == $Sec and $Opcode == $Op and ($EventID NOT IN (%EVENT_SEC%))) drop();
		if $Hostname =~ /(?x)^(\S+)\.(\S+)\.(\S+)/ {$Hostname = $1;}
		$Message_save = $Message;
		$Message_save =~ s/(\t|\R)/ /g;
		$raw_event = $EventTime + " - " + $Hostname + " - " + $Channel + " -  " + $Severity + " - "+ $EventID + " - " + $Message_save;
	</Exec>
 </Input>


<Output out>
	Module 		om_tcp
	Host 		XX.XX.XX.XX
	Port		XXXX
</Output>

# Connect input 'in' to output 'out'
 <Route 1>
     Path        in => out
 </Route>

I can't change the nxlog configuration. I can just change logstash configuration.
The logstash configuration is :

input{
 tcp{
   port => XXXX
   }
}
filter {
    mutate {
        gsub => ["message", "\t", ""]
    }
   kv{
       source => "message"
       value_split => ":"
       field_split => "\n"
   }
}
output {
    elasticsearch { hosts => ["XXXXXXX"]}
    stdout   {  codec => json }
}

But that doesn't work.

Can you help me ?

Firstly, message is an array, so you should be configuring your kv filter with a source "[message][0]".

It is unclear why you would set field_split to "\n". If your message really is delimited by newlines, as in

                     "event" => {
    "original" => "{ \"message\": [ \"2022-04-25 10:54:13 - DESKTOP-QMCS5UA - Security -  INFO - 4672 - Privilèges spéciaux attribués à la nouvelle ouverture de session.  Sujet :  \nID de sécurité :  S-1-5-18  \nNom du compte :  Système  \nDomaine du compte :  AUTORITE NT  \nID d’ouverture de session :  0x3E7  \nPrivilèges :  SeAssignPrimaryTokenPrivilege    SeTcbPrivilege    SeSecurityPrivilege    SeTakeOwnershipPrivilege    SeLoadDriverPrivilege    SeBackupPrivilege    SeRestorePrivilege    SeDebugPrivilege    SeAuditPrivilege    SeSystemEnvironmentPrivilege    SeImpersonatePrivilege    SeDelegateSessionUserImpersonatePrivilege\\r\" ] }"
},

then I would use

    mutate { gsub => [ "message", "\n", ";" ] }
    json { source => "message" }
    kv { source => "[message][0]" value_split => ":" field_split => ";" trim_value => "\s" }

which produces

             "2022-04-25 10" => "54:13 - DESKTOP-QMCS5UA - Security -  INFO - 4672 - Privilèges spéciaux attribués à la nouvelle ouverture de session.  Sujet :",
           "ID de sécurité " => "S-1-5-18",
"ID d’ouverture de session " => "0x3E7",
            "Nom du compte " => "Système",
               "Privilèges " => "SeAssignPrimaryTokenPrivilege    SeTcbPrivilege    SeSecurityPrivilege    SeTakeOwnershipPrivilege    SeLoadDriverPrivilege    SeBackupPrivilege    SeRestorePrivilege    SeDebugPrivilege    SeAuditPrivilege    SeSystemEnvironmentPrivilege    SeImpersonatePrivilege    SeDelegateSessionUserImpersonatePrivilege",
        "Domaine du compte " => "AUTORITE NT",

However, if it really is whitespace delimited as you show then I think a string in which both keys and values contain whitespace is too ambiguous for a kv filter to parse. Maybe grok?

Hello,

Thanks for your answer.
Indeed it's whitespace and not /n...
I am not sure for how to use grok in my case.
Moreover, I need to parse some event so it's not the same category for each.

I have try

filter{
 grok{
  match => ["message","%{LOGLEVEL:log_level}"]
 }
}

That's work I have the log level (INFO, Warning, ...) but I don't know how to continue that for each.

If you know what the field names are going to be then you could do something like this. If the field names are unknown I do not think there is an unambiguous way to parse the message.

Hi Badger,

Thanks to your help.

I have tried this solution :

input {
 tcp {
  port => XXX
  }

filter {
 grok {
  break_on_match => false
  match => {
   "message" => [
    "Nom du compte%{SPACE}:%{SPACE}(?<Nom_Compte>)%{SPACE}"
    "Domaine du compte%{SPACE}:%{SPACE}(?<Domaine_Compte>)%{SPACE}"
    ]
   }
  }
 }

output {
Elasticsearch { hosts => ["localhost:9200"] }
}

But my event doesn't change in kibana. I have just grok error parse failure.

Do you know why ?
Thanks a lot.

You have not included anything in the custom pattern. It does not match anything.

I don't understand.
I need to specify each account name ?
To me, (?<Nom_Compte>) was here to specify any account name.

Thanks,

Read the documentation for custom patterns. <Nom_Compte> names the pattern, but the pattern itself (everything between the > and the )) is empty. Perhaps (?<Nom_Compte>[^ ]+) which will match one or more non-space characters.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.