Trying to parse XML-logs

Hello,
im currently trying to parse a XML-log. I worked with logstash the past month to parse simple single- or multiline logs, so im familiar with the functionality of input{} / filter{} / output{}.
Now i want to parse a XML-file which looks like this:

<WartungNG>
	<BBMAG1100>
		<Ereignisliste>
			<Application>
				<Ereignis Nr.="1" Protokollname="Application" Quelle="SQLSERVERAGENT" Ebene="Warnung" Id="208" TimeCreated="03/29/2018 04:27:37" Message="SQL Server Scheduled Job 'Transfer ZEDs_VP' (0xA5FB01B26413D44BA237B477D03D50B8) - Status: Fehler - Invoked on: 2018-03-29 04:12:00 - Message: Auftragsfehler  Der Auftrag wurde von Zeitplan 13 (Alle 3 Minuten) aufgerufen. Zuletzt wurde Schritt 1 (Transfer VP) ausgeführt." />
				<Ereignis Nr.="2" Protokollname="Application" Quelle="SQLSERVERAGENT" Ebene="Warnung" Id="208" TimeCreated="03/27/2018 23:57:04" Message="SQL Server Scheduled Job 'Transfer_ZEDs_STE' (0x5CDB83FB9324EB43A84A5AC59DFED8AC) - Status: Fehler - Invoked on: 2018-03-27 23:57:00 - Message: Auftragsfehler  Der Auftrag wurde von Zeitplan 13 (Alle 3 Minuten) aufgerufen. Zuletzt wurde Schritt 1 (Transfer STE) ausgeführt." />
				...
			</Application>
			<System>
				<Ereignis Nr.="1" Protokollname="..." Quelle="..." Ebene="..." Id="..." TimeCreated="..." Message="..." />
			</System>
		</Ereignisliste>
		<PruefungUhrzeit Uhrzeitremote="04/04/2018 10:30:50" Uhrzeitlokal="04/04/2018 10:30:51" />
		<Laufwerke>
			<Laufwerk Driveletter="A:" VolumeName="" Size="" FreeSpace="" />
			<Laufwerk Driveletter="C:" VolumeName="System" Size="10487197696" FreeSpace="4781629440" />
			<Laufwerk Driveletter="D:" VolumeName="Daten" Size="32448696320" FreeSpace="29048705024" />
			<Laufwerk Driveletter="E:" VolumeName="" Size="" FreeSpace="" />
		</Laufwerke>
		<Softwareversion />
	</BBMAG1100>
	</BBMAG1101>
		...
	</BBMAG1102>
		...
</WartungNG>

(BBMAG1100 until BBMAG1113 are constructed like the shown example)

I would like to get the source of the event (BBMAG1100) and the other attributes like timestamp, number, name ...

Do i need to use Multiline or can the Xml filter plugin parse those anyways?
I dont really know how to tell the filter how do i want the json data to look like.
Do i need to tell the filter the different "levels" like:

<WartungNG>
	<BBMAG1100>
		<Ereignisliste>
			<Application>

I think im missing the point how the Xml filter plugin works, hope someone can help me on this.
Thanks in advance for your time.
Best regards

I would start with something like this:

input { 
    file { 
        path => "/somepath/somefile.xml" 
        codec => multiline { 
            pattern => "randomString" 
            negate => true 
            what => "previous" 
            auto_flush_interval => 2 
        }
        start_position => "beginning" 
        sincedb_path => "/dev/null" 
    } 
}
output { stdout { codec => rubydebug } }
filter {
    xml { 
        source => "message" 
        target => "theXML"
        store_xml => true 
    }
}

Hey Badger,
thanks for the reply. I tried something like that in the meantime but it diden't work that well. I got the number of rows as entries but i get _xmlparsefailure what makes sense because i did not tell how the data looks in my filter. What i was wondering about, do i even need to use multiline? Because my events are limited to one line without CRLF between them.
Is it insted possible to use xpath like that:
xpath =>
[
"/WartungNG/BBMAG1100/Ereignisliste/Application/test()", ...

]
Regards Throsten

Well in your initial post you showed pseudo-XML spread over 26 lines. If it is really all on one line you do not need the multiline codec.

You do not have to tell the xml filter the structure of the XML. It will work it out for itself and produce a structure like this (note that I have not set force_array => false)

        "theXML" => {
        "BBMAG1100" => [
            [0] {
                      "Laufwerke" => [
                    [0] {
                        "Laufwerk" => [
                            [0] {
                                 "VolumeName" => "",
                                       "Size" => "",
                                "Driveletter" => "A:",
                                  "FreeSpace" => ""
                            },
                            [1] {
                                 "VolumeName" => "System",
                                       "Size" => "10487197696",
                                "Driveletter" => "C:",
                                  "FreeSpace" => "4781629440"
                            },
                            [2] {
                                 "VolumeName" => "Daten",
                                       "Size" => "32448696320",
                                "Driveletter" => "D:",
                                  "FreeSpace" => "29048705024"
                            },
                            [3] {
                                 "VolumeName" => "",
                                       "Size" => "",
                                "Driveletter" => "E:",
                                  "FreeSpace" => ""
                            }
                        ]
                    }
                ],
                "PruefungUhrzeit" => [
                    [0] {
                        "Uhrzeitremote" => "04/04/2018 10:30:50",
                         "Uhrzeitlokal" => "04/04/2018 10:30:51"
                    }
                ],
[...]

You could use xpath. (rubydebug arrays start at 0, XML arrays start at 1, so this sets foo to 4781629440.)

        xpath => {
            "/WartungNG/BBMAG1100/Laufwerke/Laufwerk[2]/@FreeSpace" => "foo"
        }

If you have not set force_array then you will probably want to mutate away the arrays.

if [foo] { mutate { replace => { "foo" => "%{[foo][0]}" } } }

Hey Badger,
obviously i am horrible at explaining what i want to do, im sorry for that^^. What i meant to say is that the XML-file contains the events from multiple server (in this case BBMAG1100, BBMAG1101, ..., BBMAG1113) and i want to parse every entry like

<Ereignis Nr.="1" Protokollname="Application" Quelle="SQLSERVERAGENT" Ebene="Warnung" Id="208" TimeCreated="03/29/2018 04:27:37" Message="SQL Server Scheduled Job 'Transfer ZEDs_VP' (0xA5FB01B26413D44BA237B477D03D50B8) - Status: Fehler - Invoked on: 2018-03-29 04:12:00 - Message: Auftragsfehler  Der Auftrag wurde von Zeitplan 13 (Alle 3 Minuten) aufgerufen. Zuletzt wurde Schritt 1 (Transfer VP) ausgeführt." />

or 

<Ereignis Nr.="2" Protokollname="Application" Quelle="SQLSERVERAGENT" Ebene="Warnung" Id="208" TimeCreated="03/27/2018 23:57:04" Message="SQL Server Scheduled Job 'Transfer_ZEDs_STE' (0x5CDB83FB9324EB43A84A5AC59DFED8AC) - Status: Fehler - Invoked on: 2018-03-27 23:57:00 - Message: Auftragsfehler  Der Auftrag wurde von Zeitplan 13 (Alle 3 Minuten) aufgerufen. Zuletzt wurde Schritt 1 (Transfer STE) ausgeführt." />

as one event to Kibana. That would work without multiline i guess. The problem is that i need the source-server like BBMAG1100 (which is listed as in my first post) in every event. The outcome should look something like:

event 1 -->

"Nr.": "1",
"server": "BBMAG1100",
"name": "Application",
"source": "SQLSERVERAGENT",
...

event 2 -->

"Nr.": "2",
"server": "BBMAG1100",
"name": "Application",
"source": "SQLSERVERAGENT",
...

In this context i was woundering if i should build multiple pipelines, one for every server, which use the same document as input but different xpath condition in the filter.

Anyways, for now im gonna try to get your latest solution working.
Thank you very much for your support!
Best regards Thorsten

If you just had to parse BBMAG1100 then I would suggest something like

xml { source => "message" target => "theXML" store_xml => true force_array => false }
split { field => "[theXML][BBMAG1100][Ereignisliste][Application][Ereignis]" }

However, I think you want all the Ereignis entries. Using regexes to parse the XML is very fragile, but for that specific XML this would work.

if [message] =~ /\s+<BB/ {
    mutate { gsub => [ "message", "\s+<", "", "message", ">", "" ] }
    ruby { code => '@@servername = event.get("message")' }
    drop {}
} else if [message] =~ /\s+<Ereignis / {
    ruby { code => 'event.set("servername", @@servername)' }
    xml { source => "message" target => "theXML" store_xml => true force_array => false }
} else {
    drop {}
}
1 Like

Hi Badger,
thanks again for your reply. Thats a smart way of doing it i guess. I tried that config (without multiline) and it works almost perfect.

Anyways i also wanted to give Kiabana the right timestamp (which worked in many other config files) like this:

 filter {
        if [message] =~ /\s+<BB/ {
            mutate { gsub => [ "message", "\s+<", "", "message", ">", "" ] }
            ruby { code => '@@servername = event.get("message")' }
            drop {}
        } else if [message] =~ /\s+<Ereignis / {
            ruby { code => 'event.set("servername", @@servername)' }
            xml { 
                source => "message" 
                target => "theXML" 
                store_xml => true 
                force_array => false 
                 xpath => {
                    "/Ereignis[1]/@TimeCreated" => "timestamp"
                }
            }              
            date{
                match => [ "timestamp","MM/dd/yyyy HH:mm:ss"]
                timezone => "Europe/Berlin"	
            }          
        } else {
            drop {}
        }     
}

that writes the correct date + time in my timestamp varriable (as i can see in JSON output) but i get _dateparsefailure because Kibana can't interpret the date in the right way:

{
  "_index": "xml",
  "_type": "doc",
  "_id": "fGtBkWMB35noYLabD0Xm",
  "_version": 1,
  "_score": null,
  "_source": {
    "timestamp": [
      "04/04/2018 10:30:00"
    ],
    "host": "elastic",
...
      "TimeCreated": "04/04/2018 10:30:00",
...

As you can see the value and the format should be correct but i think the type of my timestamp does not match. So i tried to write the timestamp with single elements and then mutate them to a field like that:

 else if [message] =~ /\s+<Ereignis / {
            ruby { code => 'event.set("servername", @@servername)' }
            xml { 
                source => "message" 
                target => "theXML" 
                store_xml => true 
                force_array => false 
                 xpath => {
                    "/Ereignis[1]/@TimeCreated" => "[%{DATE_US:Datum} %{TIME:Zeit}]"
                }
            }
            mutate {
                add_field => { 
                    "timestamp" => "%{Datum} %{Zeit}"
                }
            }
            date{
                match => [ "timestamp","MM/dd/yyyy HH:mm:ss"]
                timezone => "Europe/Berlin"	
            }          
        }

Dosen't work either. I think im missing something pretty simple here :confused:

As always thank you very much for your help, i really appreciate it!
Best regards Thorsten

I solved that by simply reading the Xml filter plugin, sorry my bad. If you parse data with the xpath-function it will be converted to strings. Solution:

xml { 
                source => "message" 
                target => "theXML" 
                store_xml => true 
                force_array => false 
                 xpath => {
                    "/Ereignis[1]/@TimeCreated" => "arrayTimestamp"
                }
            }
            mutate {
                add_field => { "timestamp" => "%{arrayTimestamp[0]}"}
                remove_field => [ "arrayTimestamp" ]
            }
            date{
                match => [ "timestamp","MM/dd/yyyy HH:mm:ss"]
                timezone => "Europe/Berlin"	
            }

My last remaining problem, which i really don't get is the inconsistent assignment of the servername.
Everything else works just fine, the number of events and the content of the parsed event are both correct. The strange thing is, that sometimes it assigns the right servername to the event but sometimes it dosen't. If i restart my logstash without changing any config the assignment changes but still is not in every case correct.
I checked the filter of my pipeline again but if it gets processed sequentially it totally makes sense to me.

if there is a (new) servername

if [message] =~ /\s+<BB/ {
 mutate { gsub => [ "message", "\s+<", "", "message", ">", "" ] }

--> converts (remove) unnecessary characters

ruby { code => '@@servername = event.get("message")' }
--> writes the current servername with the get method
drop {}
now if the events start to get parsed

} else if [message] =~ /\s+<Ereignis / {
            ruby { code => 'event.set("servername", @@servername)' } 

--> writes the current servername in the variable (with the set method) which is part of that particular event.

Would be great if someone has an idea.
Best regards Thorsten

Are you using '--pipeline.workers 1' ? As I said, this kind of parsing and stashing in class variables is fragile, and in particular, probably not thread-safe.

1 Like

Hey Badger,

that absolutly makes sense. I was wondering for quite some time how it's possible that the servername allocation is that unstable. But if you think about the aspect of multiple workers who work on the same logfile it kinda makes sense now.
I changed the number of pipeline workers from default to 1 and now it works perfect (despite the fragile parsing).

You really helped me a lot so thank you very much (btw don't know which post to mark as solution because all of them where helpful, so i just go with the most important one for me).

Best regards
Thorsten

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.