I played with the split filter but couldn't figure out the syntax for using it on multiple fields, do I have to specify split {field}
every time I want to split a field or can I stack them into a single array some how?
A split filter uses a single array as input. If your event contains two arrays that should be used in the split (i.e. the first spliced-off event should contain the first item from both arrays, the second event the second item from both arrays, and so on) you'll have to do the joining before the split filter so that you have a single array of objects. In other words, if you have
{
...
"field1": ["a", "b", "c"],
"field2": ["d", "e", "f"],
...
}
and want to get events like
{
...
"field1": "a",
"field2": "d",
...
}
you need to rearrange the input event like this:
{
...
"fields": [
{"field1": "a", "field2": "d"},
{"field1": "b", "field2": "e"},
{"field1": "c", "field2": "f"},
],
...
}
What if I want to keep events a-f and place them in separate fields? Do I then say
filter {
split {
"field1"
}
split {
"field2"
}
split {
"field3"
}
etc....
I don't understand what you mean. Please give an example.
If you could just indulge me, create a pipeline configuration of how you believe this data should be processed. I've already developed a pipeline and process but you believe your method is better. I'm not saying you're wrong or I don't believe you, I'm just not getting it and the examples you provide aren't getting through to me. In addition, my process isn't compatible with Linux based installations so if you can provide something that is, that would be even better.
My Process
*Remove encoding statement
*Relocate Report_metadata, and it's sub-elements, into record element.
*Relocate Policy_Published, and it's sub-elements, into record element.
*Each record element, and sub-elements are considered a single event.
*Each element inside of record is parsed to it's own field.
I'm afraid cooking a complete and working configuration would take longer time than I have to spend.
Lol, I can't help but feel like you're trolling me now. I appreciate the help you've provided.
Hi,
I'm also in the process of importing DMARC reports in ES using Logstash.
Below is my Logstash configuration so far, it does what you want: each <record>
is an event with <report_metadata>
and <policy_published>
included.
Somehow by using split on <record>
, <report_metadata>
and <policy_published>
got copied in every event. FYI I'm using Logstash 2.4 (I don't host it).
It's my first time using the Elastic Stack so I guess it could use some optimization.
@wwalker ping me if you're willing to share your Kibana dashboards, I'd be happy to share mine once I get everything set up.
filter {
if "xml version" in [message] {
drop { }
}
xml {
force_array => true
store_xml => true
source => "message"
target => "_feedback"
}
split {
field => "[_feedback][record]"
}
mutate {
add_field => {
"_report_source_ip" => "%{[_feedback][record][row][0][source_ip]}"
"_report_org_name" => "%{[_feedback][[report_metadata][0][org_name]}"
"_report_timestamp" => "%{[_feedback][report_metadata][0][date_range][0][begin]}"
"_report_count_int" => "%{[_feedback][record][row][0][count]}"
"_report_evaluated_disposition" => "%{[_feedback][record][row][0][policy_evaluated][0][disposition]}"
"_report_evaluated_dkim" => "%{[_feedback][record][row][0][policy_evaluated][0][dkim]}"
"_report_evaluated_spf" => "%{[_feedback][record][row][0][policy_evaluated][0][spf]}"
"_policy_domain" => "%{[_feedback][policy_published][0][domain]}"
"_policy_pct_int" => "%{[_feedback][policy_published][0][pct]}"
"_policy_p" => "%{[_feedback][policy_published][0][p]}"
"_policy_adkim" => "%{[_feedback][policy_published][0][adkim]}"
"_policy_aspf" => "%{[_feedback][policy_published][0][aspf]}"
}
}
if [_feedback][record][auth_results][0][spf] {
mutate {
add_field => {
"_report_spf_domain" => "%{[_feedback][record][auth_results][0][spf][0][domain]}"
"_report_spf_result" => "%{[_feedback][record][auth_results][0][spf][0][result]}"
}
}
}
if [_feedback][record][auth_results][0][dkim] {
mutate {
add_field => {
"_report_dkim_result" => "%{[_feedback][record][auth_results][0][dkim][0][result]}"
"_report_dkim_domain" => "%{[_feedback][record][auth_results][0][dkim][0][domain]}"
}
}
}
if [_feedback][record][identifiers][0][header_from] {
mutate {
add_field => {
"_report_header_from" => "%{[_feedback][record][identifiers][0][header_from]}"
}
}
}
if [_feedback][record][policy_published][0][sp] {
mutate {
add_field => {
"_policy_sp" => "%{[_feedback][record][policy_published][0][sp]}"
}
}
}
if [_report_source_ip] {
geoip {
source => "[_report_source_ip]"
target => "_report_source_geoip"
}
}
mutate {
convert => { "_report_count_int" => "integer" }
convert => { "_policy_pct_int" => "integer" }
remove_field => [ "%{_feedback}" ]
}
}
That's awesome, I didn't think to use split for that. I'm battling the latest versions issues with the xml filter but I'll export my dashboards for you.
This might be because of the version differences but there's a lot of extra logic in yours. How are you getting data into Logstash, what's your input look like? I've also developed a template that will let you get rid of the field conversion mutations, not sure what kind of customizations other field renaming you'll have to make to get it to work with your version.
I'm using Filebeat to send reports to Logstash, the relevant part of this configuration (multiline part) can be used directly in Logstash:
filebeat.prospectors:
- input_type: log
paths:
- /dmarc/*.xml
close_eof: true
multiline:
pattern : '<feedback'
negate: true
match: after
Then my Logstash input configuration is just:
beats {
port => 5044
# SSL config
}
Thanks for your template (didn't know of this thing) and visualizations configs. I'm currently on a testing environment with old versions of ELK and only a few DMARC reports. I'll get back to you once I have everything set up on the latest versions.
I'm convinced it's the xml filter's xpath function that's jacked up. I've tried a couple different syntaxes and it seems that once I enter the correct path to the value, it errors out, like it doesn't know what to do.
No error no field creation on these
xpath => [ "/record/row/source_ip[0]", "email.source_ip" ]
xpath => [ "/record/row/source_ip[1]", "email.source_ip" ]
xpath => [ "record/row/source_ip[0]", "email.source_ip" ]
xpath => [ "record/row/source_ip[1]", "email.source_ip" ]
Error on these
xpath => [ "/record/row/source_ip", "email.source_ip" ]
xpath => [ "record/row/source_ip", "email.source_ip" ]
xpath => [ "/record/row/source_ip/text()", "email.source_ip" ]
xpath => [ "record/row/source_ip/text()", "email.source_ip" ]
Although the above is based on my modified XMLs, it also errors out on the original XMLs when I change my xpaths to include /feedback at the beginning.
OK, so I fixed my issue, which actually had to do with using disk buffering (persisted queue) with Logstash and not the pipeline configuration. So, that said, I create my fields using XML's xpath function. I think tomorrow, I will load up your pipeline and see how that works, id really like to figure out a solution that doesn't require PowerShell and works with Linux setups.
This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.