_dateparsefailure when parsing XML with logstash


(Jack Lauret) #1

Hi there!

I am trying to ingest my DMARC Reports via logstash. I manage to parse the XML, however I fail to get the correct timestamp for the events.

An XML example is:

<?xml version="1.0"?> 0.1 AMAZON-SES postmaster@amazonses.com ae2ae781-07d7-4562-8203-297b42968b11 1529366400 1529452800 example.com s r

none

none 100 0 190.190.190.190 1 none pass fail from.com example.com example.com pass example.com pass

I would like to use the end data; feedback\report_metadata\date_range\end as my timestamp. The value of this field is unix.

My pipeline looks like this:

input {
file {
path => "D:/DMARC/.xml"
discover_interval => 5
start_position => "beginning"
close_older => 60
codec => multiline {
pattern => "^<?feedback .
>"
negate => "true"
what => "previous"
}
}
}

filter {
xml {
source => "message"
store_xml => false
target => "feedback"
force_array => false
xpath => [
"/feedback/report_metadata/org_name/text()", "dmarc_reporter",
"/feedback/report_metadata/email/text()", "dmarc_reporter_email",
"/feedback/report_metadata/report_id/text()", "dmarc_repor_id",
"/feedback/report_metadata/date_range/begin/text()", "dmarc_time_start",
"/feedback/report_metadata/date_range/end/text()", "timestamp",
"/feedback/policy_published/domain/text()", "dmarc_dns_domain",
"/feedback/policy_published/aspf/text()", "dmarc_spf_alignment",
"/feedback/policy_published/p/text()", "dmarc_policy",
"/feedback/policy_published/sp/text()", "dmarc_sp",
"/feedback/policy_published/pct/text()", "dmarc_pct",
"/feedback/policy_published/adkim/text()", "dkim_alignment",
"/feedback/record/row/source_ip/text()", "dmarc_source_ip",
"/feedback/record/row/count/text()", "dmarc_source_ip_count",
"/feedback/record/row/policy_evaluated/disposition/text()", "dmarc_disposition",
"/feedback/record/row/policy_evaluated/dkim/text()", "dmarc_dkim_result",
"/feedback/record/row/policy_evaluated/spf/text()", "dmarc_spf_result",
"/feedback/record/identifiers/envelope_from/text()", "dmarc_env_from",
"/feedback/record/identifiers/header_from/text()", "dmarc_hdr_from",
"/feedback/record/auth_results/dkim/domain/text()", "dmarc_dkim_auth_domain",
"/feedback/record/auth_results/dkim/result/text()", "dmarc_dkim_auth_result",
"/feedback/record/auth_results/spf/domain/text()", "dmarc_spf_auth_domain",
"/feedback/record/auth_results/spf/result/text()", "dmarc_spf_auth_result"
]
}
date {
match => ['timestamp','UNIX']
}
mutate {
gsub => [
"dkim_alignment", "r", "Relaxed",
"dkim_alignment", "s", "Strict",
"spf_alignment", "r", "Relaxed",
"spf_alignment", "s", "Strict"
]
}
}
output {
elasticsearch {
hosts => ["elkserver.local:9200"]
index => "logstash-dmarc-%{+xxxx.ww.dd}"
}
stdout { codec => rubydebug }
}

I am kind of stuck here and any help would be highly appreciated!


(Magnus Bäck) #2

Always post configuration file contents, log files, XML snippets etc as preformatted text. Otherwise it will get mangled. Please also feel free to use the preview pane one the right to proofread your posts and check that they're legible.


(Jack Lauret) #3

Okay, let me try again..

My XML looks like this:

<?xml version="1.0" encoding="UTF-8" ?>
<feedback>
<version>1.0</version>
<report_metadata>    
<org_name>aol.net</org_name>
	<email>dmarc-admin@alerts.aol.net</email>
	<report_id>v2-1529909933-example.com</report_id>
	<date_range>
		<begin>1529798400</begin>
		<end>1529884800</end>
	</date_range>
</report_metadata>
<policy_published>
	<domain>example.com</domain>
	<adkim>s</adkim>
	<aspf>r</aspf>
	<p>none</p>
	<sp>none</sp>
	<pct>100</pct>
	<fo>0</fo>
</policy_published>
<record>
	<row>
		<source_ip>138.68.66.8</source_ip>
		<count>1</count>
		<policy_evaluated>
			<disposition>none</disposition>
			<dkim>fail</dkim>
			<spf>fail</spf>
		</policy_evaluated>
	</row>
	<identifiers>
		<header_from>example.com</header_from>
	</identifiers>
	<auth_results>
		<spf>
			<domain>example.com</domain>
			<scope>mfrom</scope>
			<result>softfail</result>
		</spf>
	</auth_results>
</record>

My pipeline looks like this

input {
  file {
	path => "D:/DMARC/*.xml"
discover_interval => 5
start_position => "beginning"
close_older => 20
codec => multiline {
	pattern => "^<\?feedback .*\>" 
	negate => "true"
	what => "previous"
	}
  }
}
filter {
  xml {
    source => "message"
    store_xml => false
target => "feedback"
force_array => false
    xpath => [
    "/feedback/report_metadata/org_name/text()", "dmarc_reporter",
    "/feedback/report_metadata/email/text()", "dmarc_reporter_email",
    "/feedback/report_metadata/report_id/text()", "dmarc_report_id",
    "/feedback/report_metadata/date_range/begin/text()", "dmarc_time_start",
    "/feedback/report_metadata/date_range/end/text()", "timestamp",
    "/feedback/policy_published/domain/text()", "dmarc_dns_domain",
    "/feedback/policy_published/aspf/text()", "dmarc_spf_alignment",
    "/feedback/policy_published/p/text()", "dmarc_policy",
    "/feedback/policy_published/sp/text()", "dmarc_sp",
    "/feedback/policy_published/pct/text()", "dmarc_pct",
    "/feedback/policy_published/adkim/text()", "dkim_alignment",
    "/feedback/record/row/source_ip/text()", "dmarc_source_ip",
    "/feedback/record/row/count/text()", "dmarc_source_ip_count",
    "/feedback/record/row/policy_evaluated/disposition/text()", "dmarc_disposition",
    "/feedback/record/row/policy_evaluated/dkim/text()", "dmarc_dkim_result",
    "/feedback/record/row/policy_evaluated/spf/text()", "dmarc_spf_result",
    "/feedback/record/identifiers/envelope_from/text()", "dmarc_env_from",
    "/feedback/record/identifiers/header_from/text()", "dmarc_hdr_from",
    "/feedback/record/auth_results/dkim/domain/text()", "dmarc_dkim_auth_domain",
    "/feedback/record/auth_results/dkim/result/text()", "dmarc_dkim_auth_result",
    "/feedback/record/auth_results/spf/domain/text()", "dmarc_spf_auth_domain",
    "/feedback/record/auth_results/spf/result/text()", "dmarc_spf_auth_result"			
        ]
}	
mutate {
  convert =>	[ "timestamp", "integer" ]
  remove_field => [ "message" ]
}	
date {
  match =>  [ "timestamp","UNIX" ]
    }
}
output {
elasticsearch {
  hosts => ["debruelk2.de.eu.sew:9200"]
  index => "logstash-dmarc-%{+xxxx.ww.dd}"
   }
stdout { codec => rubydebug }
}

I have already changed the pipeline a bit but it is still not working...

Help would be highly appreciated.


(Magnus Bäck) #4
pattern => "^<\?feedback .*\>"

Your XML doesn't contain any <?feedback> tags (only <?xml ...?> and <feedback>). Apart from that things look pretty okay from what I can tell. What do you get from your stdout output after fixing the multiline codec?


(Jack Lauret) #5

Hi Magnus,

In the end it had nothing to do with the XML Tags.
I don't know why but I hat to use this to set the correct timestamp:

date {
  match =>  [ "[dmarc_time_start][0]","UNIX" ]
}

instead of

date {
  match =>  [ "dmarc_time_start","UNIX" ]
}

Looks like the XML text was parsed as an array..
Now I've got another error :slight_smile:

  • multiline_codec_max_lines_reached

Thanks,
Jack


(system) #6

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.