Logstash XML parse with meta-data

I don't like to needlessly bother people with long questions, so I'll keep this short and searchable. I need some logstash-xml expertise. Problem:

  • I have a large directory of xml files that need to be inserted into elasticsearch, at the moment we're using a custom parser.
  • Each xml file will have one to many foo:visits, each one of these needs to be a seperate document in elasticsearch as we want to store visits.
  • Here's an example xml file:
<?xml version="1.0" encoding="iso-8859-1"?>
<foo:statistics xsi:schemaLocation="http://www.foo.no foo.xsd"
   xmlns:atk="http://www.foo.no"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
    <foo:version>1.0</foo:version>
    <foo:name>bar</foo:name>
    <foo:start>2017-01-01T00:06:34.880+02:00</foo:start>
    <foo:stop>2017-05-01T00:06:34.880+02:00</foo:stop>
    <foo:place>
        <foo:name>Bar</foo:name>
        <foo:id>1B445T4UV</foo:id>
    </foo:place>
    <!-- List of visits follow -->
    <foo:visit>
        <foo:date>2017-04-17T04:06:34.880+02:00</foo:date>
        <foo:status>2</foo:status>
    </foo:visit>
    <foo:visit>
        <foo:date>2017-04-18T04:06:34.880+02:00</foo:date>
        <foo:status>3</foo:status>
    </foo:visit>
    <foo:visit>
        <foo:date>2017-04-19T04:06:34.880+02:00</foo:date>
        <foo:status>1</foo:status>
    </foo:visit>
    <!-- foo:visit continues... -->
</foo:statistics>

We basically want to store each foo:visit, but we also want the metadata given at the top of the xml file to be stored with each visit in elasticsearch.

Is this possible using logstash?

Use an xml filter to parse the whole XML payload, then use a split filter on the array of foo:visits elements.

Thanks for the advice @magnusbaeck, I've arrived at this configuration:

input {
    beats {
        port => 5000
    }
}
 
filter {
        xml {
                source => "message"
                store_xml => "false"
                xpath => ["/foo:statistics/foo:start/text()", "start"]
                xpath => ["/foo:statistics/foo:stop/text()", "stop"]
                xpath => ["/foo:statistics/foo:place/name/text()", "place_name"]
                xpath => ["/foo:statistics/foo:place/id/text()", "place_id"]
                xpath => ["/foo:statistics/foo:visit", "visits"]
        }
 
        split {
                field => "visits"
                remove_field => "message"
        }
 
        xml {
                source => "visits"
                store_xml => "false"
                xpath => ["/foo:visit/foo:date/text()", "date"]
                xpath => ["/foo:visit/foo:status/text()", "status"]
        }
 
        mutate {
                strip => ["date", "status"]
                join => {
                        "date" => ""
                        "status" => ""
                }
                remove_field => "visits"
        }
 
        date {
                match => ["date", "ISO8601"]
        }
}
 
output {
        stdout { codec => rubydebug }
}

But I'm getting the following error message (note: I'm serving the xml file via filebeat)

logstash         | [2017-10-13T13:55:23,543][ERROR][logstash.pipeline        ] Exception in pipelineworker, the pipeline stopped processing new events, please check your filter configuration and restart Logstash. {:pipeline_id=>"main", "exception"=>"/foo:statistics/foo:start/text()", "backtrace"=>["nokogiri/XmlXpathContext.java:130:in `evaluate'", "/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/nokogiri-1.8.0-java/lib/nokogiri/xml/searchable.rb:198:in `xpath_impl'", "/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/nokogiri-1.8.0-java/lib/nokogiri/xml/searchable.rb:179:in `xpath_internal'", "/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/nokogiri-1.8.0-java/lib/nokogiri/xml/searchable.rb:154:in `xpath'", "/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-xml-4.0.4/lib/logstash/filters/xml.rb:153:in `block in filter'", "org/jruby/RubyHash.java:1343:in `each'", "/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-xml-4.0.4/lib/logstash/filters/xml.rb:152:in `filter'", "/usr/share/logstash/logstash-core/lib/logstash/filters/base.rb:145:in `do_filter'", "/usr/share/logstash/logstash-core/lib/logstash/filters/base.rb:164:in `block in multi_filter'", "org/jruby/RubyArray.java:1734:in `each'", "/usr/share/logstash/logstash-core/lib/logstash/filters/base.rb:161:in `multi_filter'", "/usr/share/logstash/logstash-core/lib/logstash/filter_delegator.rb:48:in `multi_filter'", "(eval):154:in `block in filter_func'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:504:in `block in filter_batch'", "/usr/share/logstash/logstash-core/lib/logstash/util/wrapped_synchronous_queue.rb:228:in `block in each'", "org/jruby/RubyHash.java:1343:in `each'", "/usr/share/logstash/logstash-core/lib/logstash/util/wrapped_synchronous_queue.rb:227:in `each'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:501:in `filter_batch'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:477:in `worker_loop'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:439:in `block in start_workers'"], :thread=>"#<Thread:0x32792f92@/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:290 sleep>"}
logstash         | [2017-10-13T13:55:23,606][FATAL][logstash.runner          ] An unexpected error occurred! {:error=>#<Nokogiri::XML::XPath::SyntaxError: /foo:statistics/foo:start/text()>, :backtrace=>["nokogiri/XmlXpathContext.java:130:in `evaluate'", "/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/nokogiri-1.8.0-java/lib/nokogiri/xml/searchable.rb:198:in `xpath_impl'", "/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/nokogiri-1.8.0-java/lib/nokogiri/xml/searchable.rb:179:in `xpath_internal'", "/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/nokogiri-1.8.0-java/lib/nokogiri/xml/searchable.rb:154:in `xpath'", "/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-xml-4.0.4/lib/logstash/filters/xml.rb:153:in `block in filter'", "org/jruby/RubyHash.java:1343:in `each'", "/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-xml-4.0.4/lib/logstash/filters/xml.rb:152:in `filter'", "/usr/share/logstash/logstash-core/lib/logstash/filters/base.rb:145:in `do_filter'", "/usr/share/logstash/logstash-core/lib/logstash/filters/base.rb:164:in `block in multi_filter'", "org/jruby/RubyArray.java:1734:in `each'", "/usr/share/logstash/logstash-core/lib/logstash/filters/base.rb:161:in `multi_filter'", "/usr/share/logstash/logstash-core/lib/logstash/filter_delegator.rb:48:in `multi_filter'", "(eval):154:in `block in filter_func'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:504:in `block in filter_batch'", "/usr/share/logstash/logstash-core/lib/logstash/util/wrapped_synchronous_queue.rb:228:in `block in each'", "org/jruby/RubyHash.java:1343:in `each'", "/usr/share/logstash/logstash-core/lib/logstash/util/wrapped_synchronous_queue.rb:227:in `each'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:501:in `filter_batch'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:477:in `worker_loop'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:439:in `block in start_workers'"]}
logstash exited with code 1

Any tips for what is wrong with my config?

EDIT: Just to clarify, I eventually want to store each visit as a document in elasticsearch, including the foo:version, foo:name etc. metadata at the top of the xml file. This is essential, so I hope this is possible.

The error message suggests that it doesn't accept your XPath syntax. Perhaps something with the namespace? I think the xml filter has an option related to namespaces.

Aha! Almost there now, the pipeline seems to be parsing the XML file line by line. I'm assuming I'm supposed to use some sort of multiline buffer or something, or am I getting line by line from filebeat?

OUTPUT:

...
logstash         | {
logstash         |     "@timestamp" => 2017-10-13T14:27:49.354Z,
logstash         |         "offset" => 44,
logstash         |       "@version" => "1",
logstash         |           "beat" => {
logstash         |             "name" => "87f6feae71e9",
logstash         |         "hostname" => "87f6feae71e9",
logstash         |          "version" => "6.0.0-rc1"
logstash         |     },
logstash         |           "host" => "87f6feae71e9",
logstash         |     "prospector" => {
logstash         |         "type" => "log"
logstash         |     },
logstash         |         "source" => "/mnt/log/test.XML",
logstash         |        "message" => "<?xml version=\"1.0\" encoding=\"iso-8859-1\"?>",
logstash         |           "tags" => [
logstash         |         [0] "beats_input_codec_plain_applied",
logstash         |         [1] "_xmlparsefailure",
logstash         |         [2] "_split_type_failure"
logstash         |     ]
logstash         | }
logstash         | {
logstash         |     "@timestamp" => 2017-10-13T14:27:49.357Z,
logstash         |         "offset" => 108,
logstash         |       "@version" => "1",
logstash         |           "beat" => {
logstash         |             "name" => "87f6feae71e9",
logstash         |         "hostname" => "87f6feae71e9",
logstash         |          "version" => "6.0.0-rc1"
logstash         |     },
logstash         |           "host" => "87f6feae71e9",
logstash         |     "prospector" => {
logstash         |         "type" => "log"
logstash         |     },
logstash         |         "source" => "/mnt/log/test.XML",
logstash         |        "message" => "<foo:statistics xsi:schemaLocation=\"http://www.foo.no foo.xsd\" ",
logstash         |           "tags" => [
logstash         |         [0] "beats_input_codec_plain_applied",
logstash         |         [1] "_xmlparsefailure",
logstash         |         [2] "_split_type_failure"
logstash         |     ]
logstash         | }
logstash         | {
logstash         |     "@timestamp" => 2017-10-13T14:27:49.357Z,
logstash         |         "offset" => 141,
logstash         |       "@version" => "1",
logstash         |           "beat" => {
logstash         |             "name" => "87f6feae71e9",
logstash         |         "hostname" => "87f6feae71e9",
logstash         |          "version" => "6.0.0-rc1"
logstash         |     },
logstash         |           "host" => "87f6feae71e9",
logstash         |     "prospector" => {
logstash         |         "type" => "log"
logstash         |     },
logstash         |         "source" => "/mnt/log/test.XML",
logstash         |        "message" => "  xmlns:foo=\"http://www.foo.no\" ",
logstash         |           "tags" => [
logstash         |         [0] "beats_input_codec_plain_applied",
logstash         |         [1] "_xmlparsefailure",
logstash         |         [2] "_split_type_failure"
logstash         |     ]
logstash         | }
logstash         | {
logstash         |     "@timestamp" => 2017-10-13T14:27:49.357Z,
logstash         |         "offset" => 198,
logstash         |       "@version" => "1",
logstash         |           "beat" => {
logstash         |             "name" => "87f6feae71e9",
logstash         |         "hostname" => "87f6feae71e9",
logstash         |          "version" => "6.0.0-rc1"
logstash         |     },
logstash         |           "host" => "87f6feae71e9",
logstash         |     "prospector" => {
logstash         |         "type" => "log"
logstash         |     },
logstash         |         "source" => "/mnt/log/test.XML",
logstash         |        "message" => "  xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\">",
logstash         |           "tags" => [
logstash         |         [0] "beats_input_codec_plain_applied",
logstash         |         [1] "_xmlparsefailure",
logstash         |         [2] "_split_type_failure"
logstash         |     ]
logstash         | }
...

Current logstash.conf:

input {
	beats {
		port => 5000
	}
}

filter {
        xml {
                namespaces => {
                        "foo" => "http://www.foo.no"
                        "xsi" => "http://www.w3.org/2001/XMLSchema-instance"
                }
                source => "message"
                store_xml => "false"
                
                xpath => ["/foo:statistics/foo:start/text()", "start"]
                xpath => ["/foo:statistics/foo:stop/text()", "stop"]
                xpath => ["/foo:statistics/foo:place/name/text()", "place_name"]
                xpath => ["/foo:statistics/foo:place/id/text()", "place_id"]
                xpath => ["/foo:statistics/foo:visit", "visits"]
        }

        split {
                field => "visits"
                remove_field => "message"
        }

        xml {
                source => "visits"
                store_xml => "false"
                xpath => ["/foo:visit/foo:date/text()", "date"]
                xpath => ["/foo:visit/foo:status/text()", "status"]
                remove_field => "visits"                
        }

        date {
                match => ["date", "ISO8601"]
        }
}

output {
        stdout { codec => rubydebug }
        elasticsearch {
		hosts => "elasticsearch:9200"
                index => "maaling-%{+YYYY.MM.dd}"
	}
}

Yeah, you're getting the file line by line (which is expected). Make sure you do any required multiple processing on the Filebeat side.

@magnusbaeck: Thank you very much!

For anyone in the future with similar problems with XML through filebeat logstash and elasticsearch, here is the filebeat.yml

filebeat.prospectors:
- type: log
  paths:
    - /mnt/log/*.ATKSTAT
  encoding: 'windows-1252'
  multiline.pattern: '^\<\?'
  multiline.negate: true
  multiline.match: after
  fields_under_root: true

output.logstash:
  hosts: ["logstash:5000"]

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.