XML Input Issues

I posted a topic earlier about this but it got bogged down with craziness, hoping that is the reason there was limited assistance from the community.

I am trying to ingest DMARC aggregate XML reports. Here is my pipeline:

input {
  file {
    path => "C:/DMARC/*.xml"
    discover_interval => 5
  }
}
filter {
  xml {
    target => "doc"
    source => "message"
    force_array => false
    remove_namespaces => true
  }
}
output {
  elasticsearch {
    hosts => "ElasticSearch:9200"
    user => "elastic"
    password => "elastic"
    http_compression => true
    manage_template => false
    index => "dmarcxml-%{+YYYY.MM.dd}"
  }
}

Here's a sample of the data that is being ingested:

<?xml version="1.0" encoding="windows-1252"?><feedback  xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance'  xmlns:ns1='http://dmarc.org/dmarc-xml/0.1'  xsi:schemaLocation='http://dmarc.org/dmarc-xml/0.1 dmarc_agg_report.xsd'><report_metadata><org_name>AOL</org_name><email>postmaster@aol.com</email><report_id>example.com_1517011200</report_id><date_range><begin>1516924800</begin><end>1517011200</end></date_range></report_metadata>

<policy_published><domain>example.com</domain><adkim>r</adkim><aspf>r</aspf><p>none</p><sp>none</sp><pct>100</pct></policy_published>
<record><row><source_ip>192.168.1.1</source_ip><count>1</count><policy_evaluated><disposition>none</disposition><spf>fail</spf></policy_evaluated></row><identifiers><header_from>example.com</header_from></identifiers><auth_results><dkim><domain>not.evaluated</domain><result>none</result></dkim><spf><domain>example.com</domain><scope>mfrom</scope><result>permerror</result></spf></auth_results></record>
<record><row><source_ip>192.168.1.1</source_ip><count>1</count><policy_evaluated><disposition>none</disposition><spf>fail</spf></policy_evaluated></row><identifiers><header_from>example.com</header_from></identifiers><auth_results><dkim><domain>not.evaluated</domain><result>none</result></dkim><spf><domain>example.com</domain><scope>mfrom</scope><result>permerror</result></spf></auth_results></record>
<record><row><source_ip>204.232.172.40</source_ip><count>1</count><policy_evaluated><disposition>none</disposition><spf>fail</spf></policy_evaluated></row><identifiers><header_from>example.com</header_from></identifiers><auth_results><dkim><domain>not.evaluated</domain><result>none</result></dkim><spf><domain>example.com</domain><scope>mfrom</scope><result>permerror</result></spf></auth_results></record>
<record><row><source_ip>192.168.1.2</source_ip><count>2</count><policy_evaluated><disposition>none</disposition><spf>fail</spf></policy_evaluated></row><identifiers><header_from>example.com</header_from></identifiers><auth_results><dkim><domain>not.evaluated</domain><result>none</result></dkim><spf><domain>example.com</domain><scope>mfrom</scope><result>permerror</result></spf></auth_results></record>

Problem 1:
The filter doesn't know how to handle the feedback ?attribute? in the namespace because it doesn't have a closing tag and it's ignoring some of the xml. Here is the error I see when attempting to ingest a DMARC aggregate report:

[2018-02-06T23:02:31,358][WARN ][logstash.filters.xml ] Error parsing xml with XmlSimple :source=>"message", :value=>"<?xml version="1.0" encoding="windows-1252"?<report_metadata><org_name>AOL</org_name>postmaster@aol.com<report_id>example.com_1517011200</report_id><date_range>1516924800151701 1200</date_range></report_metadata>", :exception=>#<REXML::ParseException: No close tag for /feedback Line: 1 Position: 438 Last 80 unconsumed characters: >, :backtrace=>"C:/Logstash/vendor/jruby/lib/ruby/stdlib/rexml/parsers/treeparser.rb:28:in parse'", "C:/Logstash/vendor/jruby/lib/ruby/stdlib/rexml/document.rb:288:in build'", "C:/Logstash/vendor/jruby/lib/ruby/stdlib/rexml/document.rb:45:in initialize'", "C:/Logstash/vendor/bundle/jruby/2.3.0/gems/xml-simple-1.1.5/lib/xmlsimple.rb:971:inparse'", "C:/Logstash/vendor/bundle/jruby/2.3.0/gems/xml-simple-1.1.5/lib/xmlsimple.rb:164:in xml_in'", "C:/Logstash/vendor/bundle/jruby/2.3.0/gems/xml-simple-1.1.5/lib/xmlsimple.rb:203:inxml_in'", "C:/Logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-xml-4.0.5/lib/logstash/filters/xml.rb:187:in filter'", "C:/Logstash/logstash-core/lib/logstash/filters/base.rb:145:indo_filter'", "C:/Logstash/logstash-core/lib/logstash/filters/base.rb:164:in block in multi_filter'", "org/jruby/RubyArray.java:1734:ineach'", "C:/Logstash/logstash-core/lib/logstash/filters/base.rb:161:in multi_filter'", "C:/Logstash/logstash-core/lib/logstash/filter_delegator.rb:48:inmulti_filter'", "(eval):42:in block in filter_func'", "C:/Logstash/logstash-core/lib/logstash/pipeline.rb:455:in filter_batch'", "C:/Logstash/logstash-core/lib/logstash/pipeline.rb:434:in 'worker_loop'", "C:/Logstash/logstash-core/lib/logstash/pipeline.rb:393:inblock in start_workers'"]

Problem 2:
I attempted to use the xpath option to map fields but I can't get it to work. When trying 1 or 2 xpath entries, I get no errors but I see no new fields in Discover section of Kibana. I've tried the below implementations without any luck.

xpath => ["/report_metadata/org_name", "Reporting Org"]
xpath => [ "/report_metadata/org_name", "Reporting Org" ]
xpath => "/report_metadata/org_name", "Reporting Org"

When I try the below, the pipeline starts up but when it tries to process an XML, an error is thrown saying the filter is misconfigured
xpath => [
"/report_metadata/org_name", "Reporting Org",
"/report_metadata/email", "Org Contact",
"/report_metadata/report_id", "Report ID",
"/report_metadata/date_range/begin", "Start Date",
"/report_metadata/date_range/end", "End Date",
"/policy_published/domain", "Policy Domain",
"/policy_published/aspf", "SPF Mode",
"/policy_published/adkim", "DKIM Mode",
"/policy_published/p", "DMARC Policy Action",
"/policy_published/sp", "DMARC Sub-Domain Action",
"/policy_published/pct", "Application Percentage",
"/record/row/source_ip", "Sender IP",
"/record/row/count", "Message Count",
"/record/row/policy_evaluated/disposition", "Policy Disposition",
"/record/row/policy_evaluated/spf", "SPF Disposition",
"/record/identifiers/header_from", "Message Header",
"/record/auth_results/dkim/domain", "DKIM Domain",
"/record/auth_results/dkim/result", "DKIM Result",
"/record/auth_results/spf/domain", "SPF Domain",
"/record/auth_results/spf/scope", "SPF Scope",
"/record/auth_results/spf/result", "SPF Result"
]

Problem 3:
This is more about the appearance of the data once in Logstash. I'd like the fields to have a different name. I tried reading the documentation on creating a template but I keep getting stuck on how elasticsearch maps a value to a field name. If someone could lend me a hand on starting the template so I have an understanding of what needs to be done with known raw data, I would appreciate it.

The filter doesn't know how to handle the feedback ?attribute? in the namespace because it doesn't have a closing tag and it's ignoring some of the xml.

Well, the lack of a closing tag means it's not valid XML and then Logstash can't parse it. If you can't fix the problem at the source you could use Logstash to append "" to the field contents before you pass it to the xml filter.

When I try the below, the pipeline starts up but when it tries to process an XML, an error is thrown saying the filter is misconfigured

Please quote the full error message.

This is more about the appearance of the data once in Logstash. I'd like the fields to have a different name.

You can use a mutate filter to rename fields.

Below is the error. I am using the Centralized Pipeline Manager to create/edit the pipeline. I am not copy/pasting text into the editor, I am typing it directly in. I had to cut the error into two posts due to its length.

`[2018-02-06T22:50:52,249][ERROR][logstash.pipeline ] Exception in pipelineworker, the pipeline stopped processing new events, please check your filter configuration and restart Logstash. {:pipeline_id=>"XML_DMARC", "exception"=>"ASCII-8BIT", "backtrace"=>["java.nio.charset.Charset.forName(Charset.java:531)", "nokogiri.internals.SaveContextVisitor.encodeStringToHtmlEntity(SaveContextVisitor.java:758)", "nokogiri.internals.SaveContextVisitor.enter(SaveContextVisitor.java:750)", "nokogiri.XmlText.accept(XmlText.java:92)", "nokogiri.XmlElement.accept(XmlElement.java:78)", "nokogiri.XmlNode.native_write_to(XmlNode.java:1272)", "C_3a_.Logstash.vendor.bundle.jruby.$2_dot_3_dot_0.gems.nokogiri_minus_1_dot_8_dot_1_minus_java.lib.nokogiri.xml.node.RUBY$method$write_to$0(C:/Logstash/vendor/bundle/jruby/2.3.0/gems/nokogiri-1.8.1-java/lib/nokogiri/xml/node.rb:699)", "C_3a_.Logstash.vendor.bundle.jruby.$2_dot_3_dot_0.gems.nokogiri_minus_1_dot_8_dot_1_minus_java.lib.nokogiri.xml.node.RUBY$method$serialize$0(C:/Logstash/vendor/bundle/jruby/2.3.0/gems/nokogiri-1.8.1-java/lib/nokogiri/xml/node.rb:631)", "C_3a_.Logstash.vendor.bundle.jruby.$2_dot_3_dot_0.gems.nokogiri_minus_1_dot_8_dot_1_minus_java.lib.nokogiri.xml.node.RUBY$method$to_xml$0(C:/Logstash/vendor/bundle/jruby/2.3.0/gems/nokogiri-1.8.1-java/lib/nokogiri/xml/node.rb:654)", "C_3a_.Logstash.vendor.bundle.jruby.$2_dot_3_dot_0.gems.nokogiri_minus_1_dot_8_dot_1_minus_java.lib.nokogiri.xml.node.RUBY$method$to_s$0(C:/Logstash/vendor/bundle/jruby/2.3.0/gems/nokogiri-1.8.1-java/lib/nokogiri/xml/node.rb:513)", "C_3a_.Logstash.vendor.bundle.jruby.$2_dot_3_dot_0.gems.logstash_minus_filter_minus_xml_minus_4_dot_0_dot_5.lib.logstash.filters.xml.RUBY$block$filter$2(C:/Logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-xml-4.0.5/lib/logstash/filters/xml.rb:171)", "org.jruby.runtime.CompiledIRBlockBody.yieldDirect(CompiledIRBlockBody.java:156)", "org.jruby.runtime.BlockBody.yield(BlockBody.java:114)", "org.jruby.runtime.Block.yield(Block.java:165)", "org.jruby.ir.runtime.IRRuntimeHelpers.yield(IRRuntimeHelpers.java:415)", "org.jruby.ir.targets.YieldSite.yield

YieldSite.java:87)", "C_3a_.Logstash.vendor.bundle.jruby.$2_dot_3_dot_0.gems.nokogiri_minus_1_dot_8_dot_1_minus_java.lib.nokogiri.xml.node_set.RUBY$block$each$1(C:/Logstash/vendor/bundle/jruby/2.3.0/gems/nokogiri-1.8.1-java/lib/nokogiri/xml/node_set.rb:190)", "org.jruby.runtime.CompiledIRBlockBody.yieldDirect(CompiledIRBlockBody.java:156)", "org.jruby.runtime.BlockBody.yield(BlockBody.java:114)", "org.jruby.runtime.Block.yield(Block.java:165)", "org.jruby.RubyInteger.fixnumUpto(RubyInteger.java:162)", "org.jruby.RubyInteger.upto(RubyInteger.java:134)", "C_3a_.Logstash.vendor.bundle.jruby.$2_dot_3_dot_0.gems.nokogiri_minus_1_dot_8_dot_1_minus_java.lib.nokogiri.xml.node_set.RUBY$method$each$0(C:/Logstash/vendor/bundle/jruby/2.3.0/gems/nokogiri-1.8.1-java/lib/nokogiri/xml/node_set.rb:189)", "C_3a_.Logstash.vendor.bundle.jruby.$2_dot_3_dot_0.gems.logstash_minus_filter_minus_xml_minus_4_dot_0_dot_5.lib.logstash.filters.xml.RUBY$block$filter$1(C:/Logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-xml-4.0.5/lib/logstash/filters/xml.rb:159)", "org.jruby.runtime.CompiledIRBlockBody.yieldDirect(CompiledIRBlockBody.java:156)", "org.jruby.runtime.BlockBody.yield(BlockBody.java:114)", "org.jruby.runtime.Block.yield(Block.java:165)", "org.jruby.RubyHash$12.visit(RubyHash.java:1362)", "org.jruby.RubyHash$12.visit(RubyHash.java:1359)", "org.jruby.RubyHash.visitLimited(RubyHash.java:662)", "org.jruby.RubyHash.visitAll(RubyHash.java:647)", "org.jruby.RubyHash.iteratorVisitAll(RubyHash.java:1319)", "org.jruby.RubyHash.each_pairCommon(RubyHash.java:1354)", "org.jruby.RubyHash.each(RubyHash.java:1343)", "C_3a_.Logstash.vendor.bundle.jruby.$2_dot_3_dot_0.gems.logstash_minus_filter_minus_xml_minus_4_dot_0_dot_5.lib.logstash.filters.xml.RUBY$method$filter$0(C:/Logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-xml-4.0.5/lib/logstash/filters/xml.rb:152)", "C_3a_.Logstash.logstash_minus_core.lib.logstash.filters.base.RUBY$method$do_filter$0(C:/Logstash/logstash-core/lib/logstash/filters/base.rb:145)", "C_3a_.Logstash.logstash_minus_core.lib.logstash.filters.base.RUBY$block$multi_filter$1(C:/Logstash/logstash-core/lib/logstash/filters/base.rb:164)", "org.jruby.runtime.CompiledIRBlockBody.yieldDirect(CompiledIRBlockBody.java:156)", "org.jruby.runtime.BlockBody.yield(BlockBody.java:114)", "org.jruby.runtime.Block.yield(Block.java:165)", "org.jruby.RubyArray.each(RubyArray.java:1734)", "C_3a_.Logstash.logstash_minus_core.lib.logstash.filters.base.RUBY$method$multi_filter$0(C:/Logstash/logstash-core/lib/logstash/filters/base.rb:161)", "C_3a_.Logstash.logstash_minus_core.lib.logstash.filter_delegator.RUBY$method$multi_filter$0(C:/Logstash/logstash-core/lib/logstash/filter_delegator.rb:48)", "org.jruby.internal.runtime.methods.CompiledIRMethod.call(CompiledIRMethod.java:103)", "org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:163)", "org.jruby.internal.runtime.methods.DynamicMethod.call(DynamicMethod.java:200)", "org.jruby.runtime.callsite.CachingCallSite.cacheAndCall(CachingCallSite.java:338)", "org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:163)", "org.jruby.ir.interpreter.InterpreterEngine.processCall(InterpreterEngine.java:314)", "org.jruby.ir.interpreter.StartupInterpreterEngine.interpret(StartupInterpreterEngine.java:73)", "org.jruby.ir.interpreter.Interpreter.INTERPRET_BLOCK(Interpreter.java:132)", "org.jruby.runtime.MixedModeIRBlockBody.commonYieldPath(MixedModeIRBlockBody.java:148)", "org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:73)", "org.jruby.runtime.Block.call(Block.java:124)", "org.jruby.RubyProc.call(RubyProc.java:289)", "org.jruby.internal.runtime.methods.ProcMethod.call(ProcMethod.java:63)", "org.jruby.internal.runtime.methods.DynamicMethod.call(DynamicMethod.java:204)", "org.jruby.ir.targets.InvokeSite.fail(InvokeSite.java:198)", "C_3a_.Logstash.logstash_minus_core.lib.logstash.pipeline.RUBY$method$filter_batch$0(C:/Logstash/logstash-core/lib/logstash/pipeline.rb:455)", "org.jruby.internal.runtime.methods.CompiledIRMethod.call(CompiledIRMethod.java:103)", "org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:163)", "org.jruby.ir.targets.InvokeSite.fail(InvokeSite.java:198)", "C_3a_.Logstash.logstash_minus_core.lib.logstash.pipeline.RUBY$method$worker_loop$0(C:/Logstash/logstash-core/lib/logstash/pipeline.rb:434)", "C_3a_.Logstash.logstash_minus_core.lib.logstash.pipeline.RUBY$method$worker_loop$0$__VARARGS__(C:/Logstash/logstash-core/lib/logstash/pipeline.rb)", "org.jruby.internal.runtime.methods.CompiledIRMethod.call(CompiledIRMethod.java:77)", "org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:93)", "org.jruby.ir.targets.InvokeSite.invoke(InvokeSite.java:145)", "C_3a_.Logstash.logstash_minus_core.lib.logstash.pipeline.RUBY$block$start_workers$2(C:/Logstash/logstash-core/lib/logstash/pipeline.rb:393)", "org.jruby.runtime.CompiledIRBlockBody.callDirect(CompiledIRBlockBody.java:145)", "org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:71)", "org.jruby.runtime.Block.call(Block.java:124)", "org.jruby.RubyProc.call(RubyProc.java:289)", "org.jruby.RubyProc.call(RubyProc.java:246)", "org.jruby.internal.runtime.RubyRunnable.run(RubyRunnable.java:104)", "java.lang.Thread.run(Thread.java:748)"], :thread=>"#<Thread:0x77e96bbd sleep>"}

It... looks like there's a reference to a character set named ASCII-8BIT somewhere, but that chararacter set isn't recognized by Logstash. I haven't used the centralized pipeline management feature so I can't comment on its relevance.

Maybe it's a character processing limit or the browser I am using (MS Edge), I can do some more testing tonight to try and find out. Although, as I mentioned in the original post, I first tried a single field but that field never appears in Kibana, both the Discover section or refreshing the index pattern. There is also no error in reference to it at a debug level. Maybe I am misunderstanding what xpath's function is, I am under the impression that by using the below command, a field called Reporting Org will be generated and the value of the xml tag at /report_metadata/org_name will be used. Or could it be because that xpath location exists at multiple points throughout the file? See the original post for sample data which only contains four records but the actual files could contain hundreds.

xpath => ["/report_metadata/org_name", "Reporting Org"]

BTW, I'd like to thank you for your assistance. This is the most helpful interaction I've received on these forums.

Apologies, I failed to look at the entire document, feedback DOES have a closing tag at the very end of the document...where you would expect to find it. Below is the structure of one of these reports, there can be dozens of records in each file. Now I don't understand why it thinks feedback doesn't have a closing tag.

<?xml version="1.0" encoding="WINDOWS-1252"?>

-<feedback xsi:schemaLocation="http://dmarc.org/dmarc-xml/0.1 dmarc_agg_report.xsd" xmlns:ns1="http://dmarc.org/dmarc-xml/0.1" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">

+<report_metadata>
+<policy_published>
+<record>
+<record>
+<record>
+<record>
</feedback>

Is there something malformed in the feedback/namespace section, I don't see it? I tried retyping the pipeline into the browser from scratch, used IE 11, configured the pipe in notepad and Notepad++ and then copied the contents into the browser. None of that worked.

Where is the documentation on how to remove lines before processing with XML filter? I see commands to remove various things; whitespace, fields, values but not to remove a line.

I have a feeling there is something wrong with the centralized pipeline management function. I've manually pulled the XML versioning and feedback sections out of a test file and mutate, xpath, and geoip do not seem to work.

filter {
  xml {
    target => "doc"
    source => "message"
    force_array => false
  }
  mutate {
    rename => { 
      "doc.row.count" => "Message.Count"
    }
  }
  geoip {
    source => "doc.row.source_ip"
 }
}

I get no errors during ingest but do not see the doc.row.count field change to message.count nor see a new field named message.count. I have refreshed the index pattern and tried deleting the index and index pattern with no change. GeoIP on the otherhand, just injects a tag saying geoip_lookup_failure, but that's an issue for another thread.

Where is the documentation on how to remove lines before processing with XML filter? I see commands to remove various things; whitespace, fields, values but not to remove a line.

The mutate filter's gsub option should be usable for deleting stuff from a field value.

I get no errors during ingest but do not see the doc.row.count field change to message.count nor see a new field named message.count. I have refreshed the index pattern and tried deleting the index and index pattern with no change. GeoIP on the otherhand, just injects a tag saying geoip_lookup_failure, but that's an issue for another thread.

Please show an example document. Copy/paste from Kibana's JSON tab so we can see the raw data.

Updates updates updates
There appears to be some sort of issue with the centralized pipeline management editor. I spun up an elastic stack 6.1.1 running on a single VM with no x-pack plugin, copied my pipeline with the xpath statements in it and logstash fired up and parsed the XMLs I fed to it without crashing. However, I still get the same error regarding the tag not having a closing statement...even though there is a closing statement at the end of the document.

I have a new problem now that the xpath functions are working, possibly a misunderstanding of how it works. Below is the pipeline config and a screenshot of the resulting output...notice the values include the XML tags....I didn't anticipate or want that data in the value. Is that how it is designed to function or do I have something wrong in the pipeline?

Current Pipeline:
input {
file {
path => "C:/DMARC/*.xml"
discover_interval => 5
}
}
filter {
xml {
target => "doc"
source => "message"
force_array => false
xpath => [
"/report_metadata/org_name", "Reporting Org",
"/report_metadata/email", "Org Contact",
"/report_metadata/report_id", "Report ID",
"/report_metadata/date_range/begin", "Start Date",
"/report_metadata/date_range/end", "End Date",
"/policy_published/domain", "Policy Domain",
"/policy_published/aspf", "SPF Mode",
"/policy_published/adkim", "DKIM Mode",
"/policy_published/p", "DMARC Policy Action",
"/policy_published/sp", "DMARC Sub-Domain Action",
"/policy_published/pct", "Application Percentage",
"/record/row/source_ip", "Sender IP",
"/record/row/count", "Message Count",
"/record/row/policy_evaluated/disposition", "Policy Disposition",
"/record/row/policy_evaluated/spf", "SPF Disposition",
"/record/identifiers/header_from", "Message Header",
"/record/auth_results/dkim/domain", "DKIM Domain",
"/record/auth_results/dkim/result", "DKIM Result",
"/record/auth_results/spf/domain", "SPF Domain",
"/record/auth_results/spf/scope", "SPF Scope",
"/record/auth_results/spf/result", "SPF Result"
]
}
mutate {
rename => {
"doc.row.count" => "Message.Count"
}
rename => {
"doc.row.source_ip" => "Source.IP"
}
rename => {
"doc.identifiers.header_from" => "From.Header"
}
}
}
# geoip {
# source => "doc.row.source_ip"
# }
output {
elasticsearch {
hosts => ["ElasticStack:9200"]
# user => "elastic"
# password => "elastic"
http_compression => true
manage_template => false
index => "dmarcxml-%{+YYYY.MM.dd}"
}
}

Appearance of Output:

Alright, It appears I solved all my big problems...though there has to be a more efficient way of doing this. Any ideas?

Raw xml data input to this pipeline:

input {
      file {
        path => "C:/DMARC/*.xml"
        discover_interval => 5
      }
    }
    filter {
      xml {
        store_xml => false
        source => "message"
        xpath => [
          "/feedback/report_metadata/org_name", "Reporting Org",
          "/feedback/report_metadata/email", "Org Contact",
          "/feedback/report_metadata/report_id", "Report ID",
          "/feedback/report_metadata/date_range/begin", "Start Date",
          "/feedback/report_metadata/date_range/end", "End Date",
          "/feedback/policy_published/domain", "Policy Domain",
          "/feedback/policy_published/aspf", "SPF Mode",
          "/feedback/policy_published/adkim", "DKIM Mode",
          "/feedback/policy_published/p", "DMARC Policy Action",
          "/feedback/policy_published/sp", "DMARC Sub-Domain Action",
          "/feedback/policy_published/pct", "Application Percentage",
          "/record/row/source_ip", "Sender IP",
          "/record/row/count", "Message Count",
          "/record/row/policy_evaluated/disposition", "Policy Disposition",
          "/record/row/policy_evaluated/spf", "SPF Disposition",
          "/record/identifiers/header_from", "Message Header",
          "/record/auth_results/dkim/domain", "DKIM Domain",
          "/record/auth_results/dkim/result", "DKIM Result",
          "/record/auth_results/spf/domain", "SPF Domain",
          "/record/auth_results/spf/scope", "SPF Scope",
          "/record/auth_results/spf/result", "SPF Result"
        ]
      }
      mutate {
        gsub => [ "Reporting Org", "<org_name>", "",
                  "Reporting Org", "</org_name>", "",
                  "Org Contact", "<email>", "",
                  "Org Contact", "</email>", "",
                  "Report ID", "<report_id>", "",
                  "Report ID", "</report_id>", "",
                  "Start Date", "<begin>", "",
                  "Start Date", "</begin>", "",
                  "End Date", "<end>", "",
                  "End Date", "</end>", "",
                  "Policy Domain", "<domain>", "",
                  "Policy Domain", "</domain>", "",
                  "SPF Mode", "<aspf>", "",
                  "SPF Mode", "</aspf>", "",
                  "DKIM Mode", "<adkim>", "",
                  "DKIM Mode", "</adkim>", "",
                  "DMARC Policy Action", "<p>", "",
                  "DMARC Policy Action", "</p>", "",
                  "DMARC Sub-Domain Action", "<sp>", "",
                  "DMARC Sub-Domain Action", "</sp>", "",
                  "Application Percentage", "<pct>", "",
                  "Application Percentage", "</pct>", "",
                  "Sender IP", "<source_ip>", "",
                  "Sender IP", "</source_ip>", "",
                  "Message Count", "<count>", "",
                  "Message Count", "</count>", "",
                  "Policy Disposition", "<disposition>", "",
                  "Policy Disposition", "</disposition>", "",
                  "SPF Disposition", "<spf>", "",
                  "SPF Disposition", "</spf>", "",
                  "Message Header", "<header_from>", "",
                  "Message Header", "</header_from>", "",
                  "DKIM Domain", "<domain>", "",
                  "DKIM Domain", "</domain>", "",
                  "DKIM Result", "<result>", "",
                  "DKIM Result", "</result>", "",
                  "SPF Domain", "<domain>", "",
                  "SPF Domain", "</domain>", "",
                  "SPF Result", "<result>", "",
                  "SPF Result", "</result>", "",
                  "SPF Scope", "<scope>", "",
                  "SPF Scope", "</scope>", ""
                ]
      }
      mutate {
        convert => { "Message Count" => "integer" }
      }
      geoip {
        source => "Sender IP"
      }
    }
    output {
      elasticsearch {
        hosts => ["ElasticStack:9200"]
    #   user => "elastic"
    #   password => "elastic"
        http_compression => true
        manage_template => false
        index => "dmarcxml-%{+YYYY.MM.dd}"
      }
    }

Looks like I spoke too soon. I pulled another aggregate report and tried to ingest it and everything went South. Looks like the aggregate report from AOL contains extra stuff in the feedback tag but everything else looks the same. Unfortunately, it totally alters everything on other XML aggregate report. Additionally, it looks like the structure is still the same so now i'm lost once more. I've reverted my pipeline to barebones on the filter (and the error about missing closing tag but on <record> now) and get the below output, notice all the extra whitespace, what's that about??

Raw XML
<?xml version='1.0' encoding='utf-8'?>
<feedback><report_metadata><org_name>Mail.Ru</org_name><email>dmarc_support@corp.mail.ru</email><extra_contact_info>http://help.mail.ru/mail-help</extra_contact_info><report_id>37256247916566362691518220800</report_id><date_range><begin>1518220800</begin><end>1518307200</end></date_range></report_metadata><policy_published><domain>example.com</domain><adkim>r</adkim><aspf>r</aspf><p>none</p><sp>none</sp><pct>100</pct></policy_published><record><row><source_ip>192.168.1.1</source_ip><count>1</count><policy_evaluated><disposition>none</disposition><dkim>fail</dkim><spf>pass</spf></policy_evaluated></row><identifiers><header_from>example.com</header_from></identifiers><auth_results><spf><domain>example.com</domain><scope>mfrom</scope><result>pass</result></spf></auth_results></record></feedback>

Pipeline Config
input {
file {
path => "C:/DMARC/*.xml"
discover_interval => 5
}
}
filter {
xml {
target => "doc"
source => "message"
}
}
output {
elasticsearch {
hosts => ["ElasticStack:9200"]
# user => "elastic"
# password => "elastic"
http_compression => true
manage_template => false
index => "dmarcxml-%{+YYYY.MM.dd}"
}
}

Output Results

I've turned on trace logging looking for more clues and, while this doesn't fix my issue, it does point to a possible bug in the XML filtering. When the root termination tag is on the same line as other XML data, it does not parse the data.
<root>
<entry><field1>data</field1></entry>
<entry><field1>data</field1></entry></root>

This results in only the first entry being parsed with the last one being skipped. Real world example:

<?xml version="1.0" encoding="windows-1252"?><feedback  xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance'  xmlns:ns1='http://dmarc.org/dmarc-xml/0.1'  xsi:schemaLocation='http://dmarc.org/dmarc-xml/0.1 dmarc_agg_report.xsd'><report_metadata><org_name>AOL</org_name><email>postmaster@aol.com</email><report_id>example.com_1517011200</report_id><date_range><begin>1516924800</begin><end>1517011200</end></date_range></report_metadata>

<policy_published><domain>example.com</domain><adkim>r</adkim><aspf>r</aspf><p>none</p><sp>none</sp><pct>100</pct></policy_published>
<record><row><source_ip>192.168.1.1</source_ip><count>1</count><policy_evaluated><disposition>none</disposition><spf>fail</spf></policy_evaluated></row><identifiers><header_from>example.com</header_from></identifiers><auth_results><dkim><domain>not.evaluated</domain><result>none</result></dkim><spf><domain>example.com</domain><scope>mfrom</scope><result>permerror</result></spf></auth_results></record>
<record><row><source_ip>74.208.4.196</source_ip><count>10</count><policy_evaluated><disposition>none</disposition><spf>fail</spf></policy_evaluated></row><identifiers><header_from>example.com</header_from></identifiers><auth_results><dkim><domain>not.evaluated</domain><result>none</result></dkim><spf><domain>example.com</domain><scope>mfrom</scope><result>permerror</result></spf></auth_results></record></feedback>

The last record with the 74.208.4.196 IP address does not get parsed. However, if I put a return in and place the root closing tag on it's own line, the record gets processed.

<?xml version="1.0" encoding="windows-1252"?><feedback  xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance'  xmlns:ns1='http://dmarc.org/dmarc-xml/0.1'  xsi:schemaLocation='http://dmarc.org/dmarc-xml/0.1 dmarc_agg_report.xsd'><report_metadata><org_name>AOL</org_name><email>postmaster@aol.com</email><report_id>example.com_1517011200</report_id><date_range><begin>1516924800</begin><end>1517011200</end></date_range></report_metadata>

<policy_published><domain>example.com</domain><adkim>r</adkim><aspf>r</aspf><p>none</p><sp>none</sp><pct>100</pct></policy_published>
<record><row><source_ip>192.168.1.1</source_ip><count>1</count><policy_evaluated><disposition>none</disposition><spf>fail</spf></policy_evaluated></row><identifiers><header_from>example.com</header_from></identifiers><auth_results><dkim><domain>not.evaluated</domain><result>none</result></dkim><spf><domain>example.com</domain><scope>mfrom</scope><result>permerror</result></spf></auth_results></record>
<record><row><source_ip>74.208.4.196</source_ip><count>10</count><policy_evaluated><disposition>none</disposition><spf>fail</spf></policy_evaluated></row><identifiers><header_from>example.com</header_from></identifiers><auth_results><dkim><domain>not.evaluated</domain><result>none</result></dkim><spf><domain>example.com</domain><scope>mfrom</scope><result>permerror</result></spf></auth_results></record>
</feedback>

The below image is from Kibana, the records with timestamp 11:15:14 are with the closing feedback tag on the same line as the last record. The 11:19:32 timestamp is with the closing feedback on its own line. Notice the additional record for the 72.208 IP address.

As far as the current problem with messages not parsing right, I can assume it is because the XML filter is supposed to be parsing data based on what is in the message field. When giving it the following data:

<?xml version='1.0' encoding='utf-8'?>
<feedback><report_metadata><org_name>Mail.Ru</org_name><email>dmarc_support@corp.mail.ru</email><extra_contact_info>http://help.mail.ru/mail-help</extra_contact_info><report_id>37256247916566362691518220800</report_id><date_range><begin>1518220800</begin><end>1518307200</end></date_range></report_metadata><policy_published><domain>example.com</domain><adkim>r</adkim><aspf>r</aspf><p>none</p><sp>none</sp><pct>100</pct></policy_published><record><row><source_ip>192.168.1.1</source_ip><count>1</count><policy_evaluated><disposition>none</disposition><dkim>fail</dkim><spf>pass</spf></policy_evaluated></row><identifiers><header_from>example.com</header_from></identifiers><auth_results><spf><domain>example.com</domain><scope>mfrom</scope><result>pass</result></spf></auth_results></record></feedback>

The following is output to ElasticSearch which, to my untrained eyes, indicates that the XML filter is not putting the data in the message field prior to parsing, if that's the case though, then where is it putting it, what should my source tag be?

[2018-02-11T11:51:07,544][WARN ][logstash.filters.xml     ] Error parsing xml with XmlSimple {:source=>"message", :value=>"<?xml version='1.0' encoding='utf-8'?>", :exception=>#<NoMethodError: undefined method `attributes' for nil:NilClass>, :backtrace=>[
"C:/ELK/logstash/vendor/bundle/jruby/2.3.0/gems/xml-simple-1.1.5/lib/xmlsimple.rb:720:in `get_attributes'", "C:/ELK/logstash/vendor/bundle/jruby/2.3.0/gems/xml-simple-1.1.5/lib/xmlsimple.rb:464:in `collapse'", "C:/ELK/logstash/vendor/bundle/jruby/2.3.0/ge
ms/xml-simple-1.1.5/lib/xmlsimple.rb:194:in `xml_in'", "C:/ELK/logstash/vendor/bundle/jruby/2.3.0/gems/xml-simple-1.1.5/lib/xmlsimple.rb:203:in `xml_in'", "C:/ELK/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-xml-4.0.5/lib/logstash/filters/xml.r
b:187:in `filter'", "C:/ELK/logstash/logstash-core/lib/logstash/filters/base.rb:145:in `do_filter'", "C:/ELK/logstash/logstash-core/lib/logstash/filters/base.rb:164:in `block in multi_filter'", "org/jruby/RubyArray.java:1734:in `each'", "C:/ELK/logstash/l
ogstash-core/lib/logstash/filters/base.rb:161:in `multi_filter'", "C:/ELK/logstash/logstash-core/lib/logstash/filter_delegator.rb:48:in `multi_filter'", "(eval):94:in `block in filter_func'", "C:/ELK/logstash/logstash-core/lib/logstash/pipeline.rb:455:in
`filter_batch'", "C:/ELK/logstash/logstash-core/lib/logstash/pipeline.rb:434:in `worker_loop'", "C:/ELK/logstash/logstash-core/lib/logstash/pipeline.rb:393:in `block in start_workers'"]}

[2018-02-11T11:39:19,314][DEBUG][logstash.pipeline        ] output received {"event"=>{"@timestamp"=>2018-02-11T17:39:19.267Z, "path"=>"C:/DMARC/mail.ru!stlouisco.com!1518220800!1518307200 - Copy.xml", "host"=>"Elasticstack", "@version"=>"1", "message"=>"
    <?xml version='1.0' encoding='utf-8'?>"}}

I know I'm sending a barrage of posts here but I just want to post the below to show that real progress is being made, probably stand at about 95% complete, just need some help with that final 5%. I've literally tested at least 200 different combinations just to stumble onto the right settings. Again, I'm sure there's a better way to do all this so if anybody has anything please chime in.

Current pipe:

input {
  file {
    path => "C:/DMARC/*.xml"
    discover_interval => 5
    codec => multiline {
      negate => true
      pattern => "<record>"
      what => "previous"
    }
  }
}
filter {
  xml {
    force_array => true
    store_xml => false
    source => "message"
    xpath => [
      "feedback/report_metadata/org_name/text()", "Reporting Org",
      "feedback/report_metadata/email/text()", "Org Contact",
      "feedback/report_metadata/report_id/text()", "Report ID",
      "feedback/report_metadata/date_range/begin/text()", "Start Date",
      "feedback/report_metadata/date_range/end/text()", "End Date",
      "feedback/policy_published/domain/text()", "Policy Domain",
      "feedback/policy_published/aspf/text()", "SPF Mode",
      "feedback/policy_published/adkim/text()", "DKIM Mode",
      "feedback/policy_published/p/text()", "DMARC Policy Action",
      "feedback/policy_published/sp/text()", "DMARC Sub-Domain Action",
      "feedback/policy_published/pct/text()", "Application Percentage",
      "record/row/source_ip/text()", "Sender IP",
      "record/row/count/text()", "Message Count",
      "record/row/policy_evaluated/disposition/text()", "Policy Disposition",
      "record/row/policy_evaluated/spf/text()", "SPF Disposition",
      "record/identifiers/header_from/text()", "Message Header",
      "record/auth_results/dkim/domain/text()", "DKIM Domain",
      "record/auth_results/dkim/result/text()", "DKIM Result",
      "record/auth_results/spf/domain/text()", "SPF Domain",
      "record/auth_results/spf/scope/text()", "SPF Scope",
      "record/auth_results/spf/result/text()", "SPF Result"
    ]
  }
  mutate {
    strip => [
      "Reporting Org",
      "Org Contact",
      "Report ID",
      "Start Date",
      "End Date",
      "Policy Domain",
      "SPF Mode",
      "DKIM Mode",
      "DMARC Policy Action",
      "DMARC Sub-Domain Action",
      "Application Percentage",
      "Sender IP",
      "Message Count",
      "Policy Disposition",
      "SPF Disposition",
      "Message Header",
      "DKIM Domain",
      "DKIM Result",
      "SPF Domain",
      "SPF Scope",
      "SPF Result"
      ]
    gsub => [ "Reporting Org", "<org_name>", "",
              "Reporting Org", "</org_name>", "",
              "Org Contact", "<email>", "",
              "Org Contact", "</email>", "",
              "Report ID", "<report_id>", "",
              "Report ID", "</report_id>", "",
              "Start Date", "<begin>", "",
              "Start Date", "</begin>", "",
              "End Date", "<end>", "",
              "End Date", "</end>", "",
              "Policy Domain", "<domain>", "",
              "Policy Domain", "</domain>", "",
              "SPF Mode", "<aspf>", "",
              "SPF Mode", "</aspf>", "",
              "DKIM Mode", "<adkim>", "",
              "DKIM Mode", "</adkim>", "",
              "DMARC Policy Action", "<p>", "",
              "DMARC Policy Action", "</p>", "",
              "DMARC Sub-Domain Action", "<sp>", "",
              "DMARC Sub-Domain Action", "</sp>", "",
              "Application Percentage", "<pct>", "",
              "Application Percentage", "</pct>", "",
              "Sender IP", "<source_ip>", "",
              "Sender IP", "</source_ip>", "",
              "Message Count", "<count>", "",
              "Message Count", "</count>", "",
              "Policy Disposition", "<disposition>", "",
              "Policy Disposition", "</disposition>", "",
              "SPF Disposition", "<spf>", "",
              "SPF Disposition", "</spf>", "",
              "Message Header", "<header_from>", "",
              "Message Header", "</header_from>", "",
              "DKIM Domain", "<domain>", "",
              "DKIM Domain", "</domain>", "",
              "DKIM Result", "<result>", "",
              "DKIM Result", "</result>", "",
              "SPF Domain", "<domain>", "",
              "SPF Domain", "</domain>", "",
              "SPF Result", "<result>", "",
              "SPF Result", "</result>", "",
              "SPF Scope", "<scope>", "",
              "SPF Scope", "</scope>", ""
            ]
  }
  mutate {
    convert => { "Message Count" => "integer" }
  }
  geoip {
    source => "Sender IP"
  }
}
output {
  elasticsearch {
    hosts => ["ElasticStack:9200"]
#   user => "elastic"
#   password => "elastic"
    http_compression => true
    manage_template => false
    index => "dmarcxml-%{+YYYY.MM.dd}"
  }
}

Kibana example dashboard (sensitive info redacted). I've also link enabled all IPs and domains to whois internet resources for quicker discoveries.

Removed unnecessary gsub functions, much more resource efficient and faster now. I think the only remaining problem I have (and source of the /feedback error I documented earlier) is that the file input keeps the file open to monitor for changes, this prevents the remaining section of the file from being edited. I came to this conclusion after noticing that the "missing" records process after the pipeline is modified and reloads. I suspect when the reload command is issued, it finishes processing and closes the files. I've specified the close_older option for the file input but that doesn't seem to do it. Is there another option or input I should use instead?

These XML files originate as archives (both .zip and .gz) attached to emails. The emails themselves come from remote mail servers. I download the files then extract them to a common directory for processing.

input {
  file {
    id => "Ingest"
    path => "C:/DMARC/*.xml"
    discover_interval => 5
    close_older => 5
    codec => multiline {
      negate => true
      pattern => "<record>"
      what => "previous"
    }
  }
}
filter {
  xml {
    id => "Parse"
    force_array => true
    store_xml => false
    source => "message"
    xpath => [
      "feedback/report_metadata/org_name/text()", "Reporting Org",
      "feedback/report_metadata/email/text()", "Org Contact",
      "feedback/report_metadata/report_id/text()", "Report ID",
      "feedback/report_metadata/date_range/begin/text()", "Start Date",
      "feedback/report_metadata/date_range/end/text()", "End Date",
      "feedback/policy_published/domain/text()", "Policy Domain",
      "feedback/policy_published/aspf/text()", "SPF Mode",
      "feedback/policy_published/adkim/text()", "DKIM Mode",
      "feedback/policy_published/p/text()", "DMARC Policy Action",
      "feedback/policy_published/sp/text()", "DMARC Sub-Domain Action",
      "feedback/policy_published/pct/text()", "Application Percentage",
      "record/row/source_ip/text()", "Sender IP",
      "record/row/count/text()", "Message Count",
      "record/row/policy_evaluated/disposition/text()", "Policy Disposition",
      "record/row/policy_evaluated/spf/text()", "SPF Disposition",
      "record/identifiers/header_from/text()", "Message Header",
      "record/auth_results/dkim/domain/text()", "DKIM Domain",
      "record/auth_results/dkim/result/text()", "DKIM Result",
      "record/auth_results/spf/domain/text()", "SPF Domain",
      "record/auth_results/spf/scope/text()", "SPF Scope",
      "record/auth_results/spf/result/text()", "SPF Result"
    ]
  }
  mutate {
    id => "Strip"
    strip => [ "Reporting Org",
      "Org Contact",
      "Report ID",
      "Start Date",
      "End Date",
      "Policy Domain",
      "SPF Mode",
      "DKIM Mode",
      "DMARC Policy Action",
      "DMARC Sub-Domain Action",
      "Application Percentage",
      "Sender IP",
      "Message Count",
      "Policy Disposition",
      "SPF Disposition",
      "Message Header",
      "DKIM Domain",
      "DKIM Result",
      "SPF Domain",
      "SPF Scope",
      "SPF Result"
    ]
  }
  mutate {
    id => "Convert"
    convert => { "Message Count" => "integer" }
  }
  if [Sender IP] {
    geoip {
      id => "Geo-Locate"
      source => "Sender IP"
    }
  }
}
output {
  elasticsearch {
    hosts => ["ElasticStack:9200"]
#   user => "elastic"
#   password => "elastic"
    http_compression => true
    manage_template => false
    index => "dmarcxml-%{+YYYY.MM.dd}"
  }
}

Did you have to create a template for this? If so, care to share it? :slight_smile:

Thanks

Are you tackling DMARC as well? I've created all kinds of goodness if that's the case. I have a template, sample visualizations, basic ElasticStack config files, a PowerShell script to modify the structure, and a setup guide. Once I resolve a data ingest issue that JUST cropped up, I'm gonna dump it into the world for all to see, lol.