Append data to each line

Ingesting XML data which appears like:

<statement><field1><field2></field2></field1></statement>
<record><field1><field2></field2></field1></record>
<record><field1><field2></field2></field1></record>
<record><field1><field2></field2></field1></record>

Is it possible to configure logstash to add <statement><field1><field2></field2></field1></statement> to the beginning or end of record line?

Sure, just use a mutate filter and its replace option.

mutate {
  replace => {
    "record" => "<statement><field1><field2></field2></field1></statement>%{record}"
  }
}

Not sure that would work. My current pipeline ingests the XML document and then uses the xml filter on the message field to parse out the different fields. Since <statement><field1><field2></field2></field1></statement> is on it's own line, the XML filter sees it as a single event. I'd gladly rearrange things if it's possible to make this work though.

input {
  file {
    id => "Ingest"
    path => "C:/DMARC/*.xml"
    codec => multiline {
      negate => true
      pattern => "<record>"
      what => "previous"
    }
  }
}
filter {
  xml {
    id => "Parse"
    force_array => true
    store_xml => false
    source => "message"
    xpath => [
      "feedback/report_metadata/org_name/text()", "report.org",
      "feedback/report_metadata/email/text()", "report.org_contact",
      "feedback/report_metadata/extra_contact_info/text()", "report.additional_contact",
      "feedback/report_metadata/report_id/text()", "report.id",
      "feedback/report_metadata/date_range/begin/text()", "report.start",
      "feedback/report_metadata/date_range/end/text()", "report.end",
      "feedback/policy_published/domain/text()", "policy.domain",
      "feedback/policy_published/aspf/text()", "policy.spf_mode",
      "feedback/policy_published/adkim/text()", "policy.dkim_mode",
      "feedback/policy_published/p/text()", "policy.dmarc.domain_action",
      "feedback/policy_published/sp/text()", "policy.dmarc.subdomain_action",
      "feedback/policy_published/pct/text()", "policy.percentage",
      "record/row/source_ip/text()", "email.source_ip",
      "record/row/count/text()", "email.count",
      "record/row/policy_evaluated/disposition/text()", "email.dmarc_action",
      "record/row/policy_evaluated/spf/text()", "email.spf_evaluation",
      "record/row/policy_evaluated/dkim/text()", "email.dkim_evaluation",
      "record/row/policy_evaluated/reason/type/text()", "dmarc.override_type",
      "record/row/policy_evaluated/reason/comment/text()", "dmarc.override_comment",
      "record/identifiers/envelope_to/text()", "email.envelope_to",
      "record/identifiers/envelope_from/text()", "email.envelope_from",
      "record/identifiers/header_from/text()", "email.header_from",
      "record/auth_results/dkim/domain/text()", "authresult.dkim_domain",
      "record/auth_results/dkim/result/text()", "authresult.dkim_result",
      "record/auth_results/spf/domain/text()", "authresult.spf_domain",
      "record/auth_results/spf/scope/text()", "authresult.spf_scope",
      "record/auth_results/spf/result/text()", "authresult.spf_result"
    ]
  }
 }
}

Then I don't understand the problem. What does your event currently look like? What would you like it to look like instead?

Each file contains a single policy statement with fields enclosed in the policy_published XML tags. After the policy statement, there are entries for each IP the remote server communicated with, enclosed in the record XML tag. The policy_published information identifies how/why actions where taken that are documented in each record.

What it looks like is the two images below, what I am looking for is a way to include the policy_published fields with each record.

Oh, so you basically want to merge two events coming from different files. That's not something Logstash is very good at doing.

No, I want to copy the policy_published data for each file into each record event in the same file. The file looks like this:

<policy_published><field1><field2></field2></field2></policy_published>
<record><field1><field2></field2></field1></record>
<record><field1><field2></field2></field1></record>
<record><field1><field2></field2></field1></record>
<record><field1><field2></field2></field1></record>

I'm wanting this:

<policy_published><field1><field2></field2></field2></policy_published><record><field1><field2></field2></field1></record>
<policy_published><field1><field2></field2></field2></policy_published><record><field1><field2></field2></field1></record>
<policy_published><field1><field2></field2></field2></policy_published><record><field1><field2></field2></field1></record>
<policy_published><field1><field2></field2></field2></policy_published><record><field1><field2></field2></field1></record>

You say "into each record" but the example depicts the policy_pubished elements alongside the record. Anyway, sounds like you'd want to use a ruby filter.

Maybe my example syntax is wrong...I'm just wanting the two example images posted earlier of the parsed events to end up being merged together. One event containing all the policy.* fields for each record event.

I'm no programmer, if you haven't gathered that already, but I'm guessing I would use Ruby to store the data I want copied into a variable and then call that variable using inline Ruby?

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.