XML to Elasticsearch via Logstash


#1

So I am trying to parse an XML file which is a few thousand lines long, and looks like this:

<?xml version="1.0" encoding="UTF-8"?>
<CONSOLIDATED_LIST xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="https://www.BALAHAALALALA.xsd" dateGenerated="2018-02-02T12:12:12.111-12:12">
   <INDIVIDUALS>
      <INDIVIDUAL>
         <DATA>3333</DATA>
         <VERSION>1</VERSION>
         <FIRST_NAME>JO</FIRST_NAME>
         <SECOND_NAME>SMITH</SECOND_NAME>
         <THIRD_NAME />
         <SOME_LIST_TYPE>AAA</SOME_LIST_TYPE>
         <REFERENCE_NUMBER>111</REFERENCE_NUMBER>
         <LISTED_ON>2014-01-01</LISTED_ON>
         <COMMENTS>Hello</COMMENTS>
         <NATIONALITY>
            <VALUE>Hey</VALUE>
         </NATIONALITY>
         <LIST_TYPE>
            <VALUE>some List</VALUE>
         </LIST_TYPE>
         <LAST_DAY_UPDATED>
            <VALUE />
         </LAST_DAY_UPDATED>
         <NICKNAME>
            <QUALITY />
            <VALUE />
         </NICKNAME>
         <ADDRESS>
            <COUNTRY />
         </ADDRESS>
         <DOB>
            <DATE>1990-05-11</DATE>
         </DOB>
         <POB/>
      </INDIVIDUAL>
      <INDIVIDUAL>
         <DATA>1111111</DATA>
         <VERSION>2</VERSION>
         <FIRST_NAME>BEEP</FIRST_NAME>
         <SECOND_NAME>BOOP</SECOND_NAME>
         <THIRD_NAME />
         <SOME_LIST_TYPE>DJJ</SOME_LIST_TYPE>
         <REFERENCE_NUMBER>4444</REFERENCE_NUMBER>
         <LISTED_ON>2016-12-25</LISTED_ON>
         <COMMENTS>ASDFJKLH DSGHJDSGH LAS GFOWE I .</COMMENTS>
         <NATIONALITY>
            <VALUE>ASDFS SDAF SDGSDAGSGDSG SDAG </VALUE>
         </NATIONALITY>
         <LIST_TYPE>
            <VALUE>SOME List</VALUE>
         </LIST_TYPE>
         <LAST_DAY_UPDATED>
            <VALUE />
         </LAST_DAY_UPDATED>
         <NICKNAME>
            <QUALITY />
            <VALUE />
         </NICKNAME>
         <ADDRESS>
            <COUNTRY />
         </ADDRESS>
         <DOB>
            <DATE>1990-01-01</DATE>
         </DOB>
         <POB/>
      </INDIVIDUAL>
    </INDIVIDUALS>
</CONSOLIDATED_LIST>

My input and filter sections look like this

input {
    file { 
        id => "Ingest"
        path => ["c:/ELK-Stack/un/small.xml"]
        start_position => "beginning"
        ignore_older => 0 
        sincedb_path => "NUL"   
        codec => multiline {
            pattern => "<CONSOLIDATED_LIST>"
            negate => "true"
            what => "previous"
        }     
    }

}
  
filter {


    xml {
        id => "Parse"
        store_xml => false
        source => "message"
        target => "xml_content"
        force_array => true
        xpath => [
            "CONSOLIDATED_LIST/INDIVIDUALS/INDIVIDUAL/DATAID/text()", "DATA",
            "CONSOLIDATED_LIST/INDIVIDUALS/INDIVIDUAL/VERSIONNUM/text()", "VERSION",
            "CONSOLIDATED_LIST/INDIVIDUALS/INDIVIDUAL/FIRST_NAME/text()", "FIRST_NAME",
            "CONSOLIDATED_LIST/INDIVIDUALS/INDIVIDUAL/SECOND_NAME/text()", "SECOND_NAME",
            "CONSOLIDATED_LIST/INDIVIDUALS/INDIVIDUAL/THIRD_NAME/text()", "THIRD_NAME",
            "CONSOLIDATED_LIST/INDIVIDUALS/INDIVIDUAL/UN_LIST_TYPE/text()", "SOME_LIST_TYPE",
            "CONSOLIDATED_LIST/INDIVIDUALS/INDIVIDUAL/REFERENCE_NUMBER/text()", "REFERENCE_NUMBER",
            "CONSOLIDATED_LIST/INDIVIDUALS/INDIVIDUAL/COMMENTS1/text()", "COMMENTS",
            "CONSOLIDATED_LIST/INDIVIDUALS/INDIVIDUAL/NATIONALITY/VALUE/text()", "NATIONALITY",
            "CONSOLIDATED_LIST/INDIVIDUALS/INDIVIDUAL/LIST_TYPE/VALUE/text()", "LIST_TYPE",
            "CONSOLIDATED_LIST/INDIVIDUALS/INDIVIDUAL/LAST_DAY_UPDATED/VALUE/text()", "LAST_DAY_UPDATED",
            "CONSOLIDATED_LIST/INDIVIDUALS/INDIVIDUAL/NICKNAME/QUALITY/text()", "QUALITY",
            "CONSOLIDATED_LIST/INDIVIDUALS/INDIVIDUAL/NICKNAME/ALIAS_NAME/text()", "ALIAS_NAME",
            "CONSOLIDATED_LIST/INDIVIDUALS/INDIVIDUAL/ADDRESS/COUNTRY/text()", "COUNTRY",
            "CONSOLIDATED_LIST/INDIVIDUALS/INDIVIDUAL/DOB/DATE/text()", "DATE",
            "CONSOLIDATED_LIST/INDIVIDUALS/POB/text()", "INDIVIDUAL_PLACE_OF_BIRTH"     
        ]
    }

    mutate {
        remove_field => ['@timestamp', 'message', 'host', '@version', 'path']
    }
}

The output produced through the console is as follows

 "NATIONALITY" => [
[0] "hey",
[1] "ASDFS SDAF SDGSDAGSGDSG SDAG "

],
"DATA" => [
[0] "3333",
[1] "1111111"
],

"SOME_LIST_TYPE" => [
[0] "AAA",
[1] "DJJ"

],
"COMMENTS" => [
[0] "Hello ."
],
"REFERENCE_NUMBER" => [
[0] "111",
[1] "4444"
],

  "FIRST_NAME" => [
[0] "RI",
[1] "BEEP"

],
"tags" => [
[0] "multiline"
],
"SECOND_NAME" => [
[0] "WON HO",
[1] "BOOP"
],
"DATE" => [
[0] "1990-05-11",
[1] "1990-01-01"
],
"NICKNAME" => [
[0] "sup"
],
"VERSION" => [
[0] "1",
[1] "2"
],
"LIST_TYPE" => [
[0] "some List",
[1] "SOME List"
]
}

I would instead like the output to display something like this (variables aren't in order, so like a normal logstash console output):

"Nationality" => "hey",
"DATA" => "3333",
"VERSION" => "1",
"FIRST_NAME" => "RI",

etc..

"Nationality" => "ASDFS SDAF SDGSDAGSGDSG SDAG ",
"DATA" => "1111111",
"VERSION" => "2",
"FIRST_NAME" => "BEEP",

I think my problem might be with the multiline codec, or that I'm missing a step after I've completed the xpath, however I'm not too sure how to proceed. I've tried setting the multiline codec pattern at "" however this doesn't really help. I also can't seem to split my data, as I always get this error (Only String and Array types are splittable. field:INDIVIDUALS/INDIVIDUAL/CONSOLIDATED_LIST is of type = NilClass). I also cant convert the type to string...
I've had a look around the forum and tried various solutions but my result doesn't really seem to change to the output I'm after. The config file posted here looks promising but it doesn't really work for me either (Parsing xml using logstaash xpath)

Any help would be much appreciated.


(Walker) #2

I think you want to use <Individual> for your multiline and then start your xpath with Individual. XPath syntax says that if you don't specify a leading / then it looks at ALL matching records. Since there are multiple matches for each field, you get an array of values in the results.

Logstash processes a single line in a file as a single event. Matching on Consolidated_List, your data looks like this when it hits the XML parser:

 `<CONSOLIDATED_LIST xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="https://www.BALAHAALALALA.xsd" dateGenerated="2018-02-02T12:12:12.111-12:12"><INDIVIDUALS><INDIVIDUAL><DATA>3333</DATA><VERSION>1</VERSION><FIRST_NAME>JO</FIRST_NAME><SECOND_NAME>SMITH</SECOND_NAME><THIRD_NAME /><SOME_LIST_TYPE>AAA</SOME_LIST_TYPE><REFERENCE_NUMBER>111</REFERENCE_NUMBER><LISTED_ON>2014-01-01</LISTED_ON><COMMENTS>Hello</COMMENTS><NATIONALITY><VALUE>Hey</VALUE></NATIONALITY><LIST_TYPE><VALUE>some List</VALUE></LIST_TYPE><LAST_DAY_UPDATED><VALUE /></LAST_DAY_UPDATED><NICKNAME><QUALITY /><VALUE /></NICKNAME><ADDRESS><COUNTRY /></ADDRESS><DOB><DATE>1990-05-11</DATE></DOB><POB/></INDIVIDUAL><INDIVIDUAL><DATA>1111111</DATA><VERSION>2</VERSION><FIRST_NAME>BEEP</FIRST_NAME><SECOND_NAME>BOOP</SECOND_NAME><THIRD_NAME /><SOME_LIST_TYPE>DJJ</SOME_LIST_TYPE><REFERENCE_NUMBER>4444</REFERENCE_NUMBER><LISTED_ON>2016-12-25</LISTED_ON><COMMENTS>ASDFJKLH DSGHJDSGH LAS GFOWE I .</COMMENTS><NATIONALITY><VALUE>ASDFS SDAF SDGSDAGSGDSG SDAG </VALUE></NATIONALITY><LIST_TYPE><VALUE>SOME List</VALUE></LIST_TYPE><LAST_DAY_UPDATED><VALUE /></LAST_DAY_UPDATED><NICKNAME><QUALITY /><VALUE /></NICKNAME><ADDRESS><COUNTRY /></ADDRESS><DOB><DATE>1990-01-01</DATE></DOB><POB/></INDIVIDUAL></INDIVIDUALS></CONSOLIDATED_LIST>

Next up, the XML parser starts looking for matches to your xpath paths, First_Name for example will result in JO and BEEP being found and placed in the First_Name field as an array.

If you change your multiline to Individual you'll end up with data like:

> <CONSOLIDATED_LIST xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="https://www.BALAHAALALALA.xsd" dateGenerated="2018-02-02T12:12:12.111-12:12">
> <INDIVIDUALS>
> <INDIVIDUAL><DATA>3333</DATA><VERSION>1</VERSION><FIRST_NAME>JO</FIRST_NAME><SECOND_NAME>SMITH</SECOND_NAME><THIRD_NAME /><SOME_LIST_TYPE>AAA</SOME_LIST_TYPE><REFERENCE_NUMBER>111</REFERENCE_NUMBER><LISTED_ON>2014-01-01</LISTED_ON><COMMENTS>Hello</COMMENTS><NATIONALITY><VALUE>Hey</VALUE></NATIONALITY><LIST_TYPE><VALUE>some List</VALUE></LIST_TYPE><LAST_DAY_UPDATED><VALUE /></LAST_DAY_UPDATED><NICKNAME><QUALITY /><VALUE /></NICKNAME><ADDRESS><COUNTRY /></ADDRESS><DOB><DATE>1990-05-11</DATE></DOB><POB/></INDIVIDUAL>
> <INDIVIDUAL><DATA>1111111</DATA><VERSION>2</VERSION><FIRST_NAME>BEEP</FIRST_NAME><SECOND_NAME>BOOP</SECOND_NAME><THIRD_NAME /><SOME_LIST_TYPE>DJJ</SOME_LIST_TYPE><REFERENCE_NUMBER>4444</REFERENCE_NUMBER><LISTED_ON>2016-12-25</LISTED_ON><COMMENTS>ASDFJKLH DSGHJDSGH LAS GFOWE I .</COMMENTS><NATIONALITY><VALUE>ASDFS SDAF SDGSDAGSGDSG SDAG </VALUE></NATIONALITY><LIST_TYPE><VALUE>SOME List</VALUE></LIST_TYPE><LAST_DAY_UPDATED><VALUE /></LAST_DAY_UPDATED><NICKNAME><QUALITY /><VALUE /></NICKNAME><ADDRESS><COUNTRY /></ADDRESS><DOB><DATE>1990-01-01</DATE></DOB><POB/></INDIVIDUAL>
> </INDIVIDUALS>
> </CONSOLIDATED_LIST>

While this will then give you the lines you want, it will result in additional events being created for the four other lines. Whether or not that matters is up to you, I had a similar issue and built a PowerShell script to strip off extraneous fields like that, though I'm not sure it will help you here I am glad to share it with you if you like.


#3

Thanks for replying!
Okay so I made the following changes to the codec:
codec => multiline {
pattern => "</INDIVIDUAL/>" (doesn't include the two '/', only here to display the brackets)
negate => "true"
what => "previous"
}

As for xpath, I tried 3 different paths --> starting with '/', '//' and no slash ("/INDIVIDUAL/DATA/text()", "DATA", "//INDIVIDUAL/DATA/text()", "DATA" AND "INDIVIDUAL/DATA/text()", "DATA") and they all produced the same following output:

[2018-02-21T10:50:06,984][INFO ][logstash.agent           ] Pipelines running {:count=>1, :pipelines=>["main"]}
{
    "tags" => [
        [0] "multiline"
    ]
}
{
         "NATIONALITY" => [
        [0] "hey"
    ],
              "DATA" => [
        [0] "3333"
    ],
              "REFERENCE_NUMBER" => [
        [0] "111"
    ],
           "COMMENTS1" => [
        [0] "Hello"
    ],

etc...
The displayed output is the same even when the "force_array => true" setting is removed .
My second 'individual' section isn't displaying now as well as only the first one is being read in by the looks of it.
Although I'm not up to that step yet, I'd love to see the PS script you've made as it may come in handy


(Walker) #4

Guessing here as my stuff is all at home, but set Force_Array to false, default for that setting is true.

https://www.elastic.co/guide/en/logstash/current/plugins-filters-xml.html


(Walker) #5

For your xpath, /INDIVIDUAL/DATA/text() should work. Remember, each line is processed by the XML parser so your root for each line is Individual.


#6

I made both the changes you just recommended and I'm still getting the same output as post 3 strangely enough. I am testing the right file, I've double checked haha.
Also, the second set of data under 'individual' only displays when I go to shut down the pipeline (CTRL + C)

[2018-02-21T11:23:33,884][WARN ][logstash.runner          ] SIGINT received. Shutting down the agent.
{
         "NATIONALITY" => [
        [0] "ASDFS SDAF SDGSDAGSGDSG SDAG "
    ],

Could the error be to do with my xml file itself? Or is the file fine as I haven't encountered any file specific errors?


#7

I've validated my xml file and it has no errors, which means the issue is definitely with my config file...


#8

adding in the following seems to help:

mutate {
        replace => {
            "NATIONALITY" => "%{[NATIONALITY][0]}"
            "DATA" => "%{[DATA][0]}"
            "VERSION" => "%{[VERSION][0]}"
            "FIRST_NAME" => "%{[FIRST_NAME][0]}"

etc..
}

output:

                 "DATE" => "%{[DATE][0]}",
           "THIRD_NAME" => "%{[THIRD_NAME][0]}",

}
{
              "NATIONALITY" => "hey",
                      DATA" => "3333",
} etc...

This looks somewhat right, however the first time I run the file, the first set of outputs is completely empty like so:

                "NATIONALITY" => "%{[NATIONALITY][0]}",
                       "DATA" => "%{[DATA][0]}",
                 "SOME_LIST_TYPE" => "%{[SOME_LIST_TYPE][0]}",
                    "COMMENTS" => "%{[COMMENTS][0]}",

etc..
so I still don't think the multiline codec isn't working quite right...


(Walker) #9

What version of Logstash are you using?


#10

I installed it back in Dec 2017, so I assume 6.x?


(Walker) #11

Also, the second set of data under 'individual' only displays when I go to shut down the pipeline

Use the auto_flush_interval option in the multiline config. The file input keeps files open and monitors them for changes, the last record is never fully processed until a new event writes to the file or the pipeline reloads/shuts down.

Threw your example file above into an xml file, used this as my pipeline:

input {
  file { 
    id => "Ingest"
    path => ["c:/example/*.xml"]
    start_position => "beginning"
    ignore_older => 0 
    sincedb_path => "NUL"   
    codec => multiline {
      pattern => "<INDIVIDUAL>"
      negate => "true"
      what => "previous"
      auto_flush_interval => 2
    }
  }
}
filter {
  xml {
    id => "Parse"
    store_xml => false
    source => "message"
    target => "xml_content"
    force_array => true
    xpath => [
      "INDIVIDUAL/DATA/text()", "DATA",
      "INDIVIDUAL/VERSION/text()", "VERSION",
      "INDIVIDUAL/FIRST_NAME/text()", "FIRST_NAME",
      etc....

I spent some time trying to work with the split function but could never get it to work with more than one field, not sure if it's possible, what the syntax is, or if you have to specify split {} each time you want to split a field. Using it on just one field worked fine though. This pipeline spit out three events, as shown below. I also spent time with the drop filter trying to get it to drop the 3rd erroneous error but couldn't figure that one out either.


(Walker) #12

Figured out the drop filter for your use case, which in turn will help me with mine, HUZZAH! Throw this in after your XML filter: if "xml version" in [message] { drop { }}


(Walker) #13

Below is my PowerShell XML restructure script. While the first half of the script involves decompressing archives and renaming files, stuff that probably doesn't apply to your use case, there are references to variables in the part that you may be interested in. Also, if you're a PowerShell guru, let me know if there are areas for improvement....my code is probably not the most elegant.

##############Archive Decompression##############
$ARoot = Read-Host "Folder containing archives"
$FDst = Read-Host "Destination path"
$Ingest = Read-Host "Ingest Folder"
Clear-Host
Write-Host "Installing 7Zip4Powershell Module"
Install-Package 7Zip4Powershell -Force
$ARepo = Get-Childitem $ARoot | Select Name
foreach ($AFile in $ARepo) {
$AName = $AFile | Select -ExpandProperty Name
Expand-7zip -Verbose $ARoot\$AName -TargetPath $FDst
}
$NameCheck = Get-Childitem $FDst | Where-Object {$_.Name -notlike "*.xml"}
foreach ($Name in $NameCheck) {
$Name | Rename-Item -NewName { [io.path]::ChangeExtension($_.name, "") }
}
#################XML Restructure#################
$XMLRoot = $FDst
$XMLRepo = Get-Childitem $XMLRoot | Select -ExpandProperty Name
foreach ($xmlfile in $xmlrepo) {
[XML]$xml = Get-Content -Path "$XMLRoot\$xmlfile" 
$xmlrecord = $xml.feedback.record
foreach ($record in $xmlrecord) {
$xmlreport = $xml.SelectSingleNode("//feedback/report_metadata").Clone()
$xmlpolicy = $xml.SelectSingleNode("//feedback/policy_published").Clone()
$record.AppendChild($xmlreport)
$record.AppendChild($xmlpolicy)
}
$xmlpolicy = $xml.SelectSingleNode("//feedback/policy_published")
$xmlreport = $xml.SelectSingleNode("//feedback/report_metadata")
$xml.feedback.RemoveChild($xmlreport)
$xml.feedback.RemoveChild($xmlpolicy)
$xml.Save("$Ingest\$xmlfile") 
}

#14

Oh wow, thanks a lot for all this!
In your filter section, after you finish defining xpath do you have a mutate function? (like I mentioned above) or is it not needed? And then does the following:
"if "xml version" in [message] { drop { }}"
goes underneath mutate but still inside filter?
Also, what does your console output looks like? like for the variables that are empty does it display as:
"%{[NATIONALITY][0]}"
Also, do empty fields not get pushed to elasticsearch automatically in your case?
Thanks a lot for the PS script btw, I'm an amateur with PS so I can't really give you any constructive feedback other than it looks really good! :rofl:


(Walker) #15

I have no clue what it looks like in Elasticsearch nor the commands to display it, lol. For simplicities sake, I stopped at just the XPath statements, verified basic functionality with that and then added the drop filter if statement after the XML filter. Whether or not you want to drop those fields is up to you. In my use case, I hold onto the message field for a kind of "raw data" view. Also, while I'm not sure how Elasticsearch displays the data in the console, in Kibana, you can specify a field to use as a timestamp, as long as it's formatted as a date field.


(Magnus Bäck) #16

I would instead like the output to display something like this (variables aren't in order, so like a normal logstash console output):

You have two options:

  • Don't use XPath to extract the field values but instead enable store_xml. That'll give you a JSON structure that more closely resembles the input XML, but then you'll have to remove the fields you don't want.
  • Use a ruby filter to join the elements of the arrays into an array of the kind of events you'd like to see in the end:
{
  ...
  "whatever": [
    {"Nationality": "hey", "DATA": "3333", ...},
    {"Nationality": "ASDFS SDAF SDGSDAGSGDSG SDAG ", "DATA": "1111111", ...},
    ...
  ],
  ...
}

Then use the split filter on that field (whatever in the example above).

I do not recommend the path suggested by @wwalker. Splitting on individual subelements like INDIVIDUAL is brittle since it assumes the XML is pretty-printed.


#17

Thanks so much for all your help!! I hope helping me ended up helping you too :slight_smile:


#18

Hi Magnus,

I'm going down the route of the first option you suggested. Once I've pushed the data into kibana, I noticed that some fields sometimes have arrays. For example the variable 'nickname', sometimes it's a normal variable (holds just one value), while other times it holds an array when it contains too many values. My problem right now is that I have too many fields as well as too much data to sift through it all manually to determine which ones sometimes contain arrays. Is there a way to do this through the ruby filter perhaps? Would it be possibly to write code which reads in each variable and, if it contains an array, it flattens/splits it somehow to ensure kibana can handle it properly?


(Magnus Bäck) #19

Kibana doesn't care if a field contains an array of strings instead of just a string, but sure, a ruby filter can help here.


#20

Would you be able to give me an example, or guide me towards what sort of things I should include in the ruby filter? I'm not too familiar with it so any help would be much appreciated.