Hi there!
I am trying to ingest my DMARC Reports via logstash. I manage to parse the XML, however I fail to get the correct timestamp for the events.
An XML example is:
<?xml version="1.0"?> 0.1 AMAZON-SES postmaster@amazonses.com ae2ae781-07d7-4562-8203-297b42968b11 1529366400 1529452800 example.com s rnone
none 100 0 190.190.190.190 1 none pass fail from.com example.com example.com pass example.com passI would like to use the end data; feedback\report_metadata\date_range\end as my timestamp. The value of this field is unix.
My pipeline looks like this:
input {
file {
path => "D:/DMARC/.xml"
discover_interval => 5
start_position => "beginning"
close_older => 60
codec => multiline {
pattern => "^<?feedback .>"
negate => "true"
what => "previous"
}
}
}
filter {
xml {
source => "message"
store_xml => false
target => "feedback"
force_array => false
xpath => [
"/feedback/report_metadata/org_name/text()", "dmarc_reporter",
"/feedback/report_metadata/email/text()", "dmarc_reporter_email",
"/feedback/report_metadata/report_id/text()", "dmarc_repor_id",
"/feedback/report_metadata/date_range/begin/text()", "dmarc_time_start",
"/feedback/report_metadata/date_range/end/text()", "timestamp",
"/feedback/policy_published/domain/text()", "dmarc_dns_domain",
"/feedback/policy_published/aspf/text()", "dmarc_spf_alignment",
"/feedback/policy_published/p/text()", "dmarc_policy",
"/feedback/policy_published/sp/text()", "dmarc_sp",
"/feedback/policy_published/pct/text()", "dmarc_pct",
"/feedback/policy_published/adkim/text()", "dkim_alignment",
"/feedback/record/row/source_ip/text()", "dmarc_source_ip",
"/feedback/record/row/count/text()", "dmarc_source_ip_count",
"/feedback/record/row/policy_evaluated/disposition/text()", "dmarc_disposition",
"/feedback/record/row/policy_evaluated/dkim/text()", "dmarc_dkim_result",
"/feedback/record/row/policy_evaluated/spf/text()", "dmarc_spf_result",
"/feedback/record/identifiers/envelope_from/text()", "dmarc_env_from",
"/feedback/record/identifiers/header_from/text()", "dmarc_hdr_from",
"/feedback/record/auth_results/dkim/domain/text()", "dmarc_dkim_auth_domain",
"/feedback/record/auth_results/dkim/result/text()", "dmarc_dkim_auth_result",
"/feedback/record/auth_results/spf/domain/text()", "dmarc_spf_auth_domain",
"/feedback/record/auth_results/spf/result/text()", "dmarc_spf_auth_result"
]
}
date {
match => ['timestamp','UNIX']
}
mutate {
gsub => [
"dkim_alignment", "r", "Relaxed",
"dkim_alignment", "s", "Strict",
"spf_alignment", "r", "Relaxed",
"spf_alignment", "s", "Strict"
]
}
}
output {
elasticsearch {
hosts => ["elkserver.local:9200"]
index => "logstash-dmarc-%{+xxxx.ww.dd}"
}
stdout { codec => rubydebug }
}
I am kind of stuck here and any help would be highly appreciated!