Hiya @geoffrey I had a little fun tonight
Note not a DMARC expert, let the
(LLM) help me.
This whole example assumes variable DMARC records.. if they are not variable then this will become much more simple.
if I was really doing this at scale I would parse this on ingest with a ingest pipeline and the KV processor so data would parse before it was ever written... But in this case I am doing this at query time with ESQL
_bulk
load some test DMARC You might want to update the time stamps
Kibana -> Dev Tools
DELETE _data_stream/logs-dmarc-default
POST _bulk
{"create": {"_index": "logs-dmarc-default"}}
{"@timestamp": "2025-09-20T04:28:33.971711+00:00", "data_stream": {"type": "logs", "dataset": "dmarc", "namespace": "default"}, "message": "v=DMARC1; p=reject; rua=mailto:dmarc-reports@example.com"}
{"create": {"_index": "logs-dmarc-default"}}
{"@timestamp": "2025-09-20T04:28:34.971711+00:00", "data_stream": {"type": "logs", "dataset": "dmarc", "namespace": "default"}, "message": "v=DMARC1; p=quarantine; adkim=r; aspf=r; rua=mailto:dmarc@example.org; ruf=mailto:forensics@example.org; fo=1"}
{"create": {"_index": "logs-dmarc-default"}}
{"@timestamp": "2025-09-20T04:28:35.971711+00:00", "data_stream": {"type": "logs", "dataset": "dmarc", "namespace": "default"}, "message": "v=DMARC1; p=none; rua=mailto:dmarc-monitor@example.net"}
{"create": {"_index": "logs-dmarc-default"}}
{"@timestamp": "2025-09-20T04:28:36.971711+00:00", "data_stream": {"type": "logs", "dataset": "dmarc", "namespace": "default"}, "message": "v=DMARC1; p=reject; adkim=s; aspf=s; rua=mailto:reports@mail.example.com"}
{"create": {"_index": "logs-dmarc-default"}}
{"@timestamp": "2025-09-20T04:28:37.971711+00:00", "data_stream": {"type": "logs", "dataset": "dmarc", "namespace": "default"}, "message": "v=DMARC1; p=quarantine; pct=50; rua=mailto:dmarc@company.com"}
{"create": {"_index": "logs-dmarc-default"}}
{"@timestamp": "2025-09-20T04:28:38.971711+00:00", "data_stream": {"type": "logs", "dataset": "dmarc", "namespace": "default"}, "message": "v=DMARC1; p=reject; rua=mailto:dmarc1@bank.example,mailto:dmarc2@vendor.com"}
{"create": {"_index": "logs-dmarc-default"}}
{"@timestamp": "2025-09-20T04:28:39.971711+00:00", "data_stream": {"type": "logs", "dataset": "dmarc", "namespace": "default"}, "message": "v=DMARC1; p=reject; ruf=mailto:forensics@secure.example; fo=0"}
{"create": {"_index": "logs-dmarc-default"}}
{"@timestamp": "2025-09-20T04:28:40.971711+00:00", "data_stream": {"type": "logs", "dataset": "dmarc", "namespace": "default"}, "message": "v=DMARC1; p=none; rua=mailto:dmarc@startup.io; ruf=mailto:forensics@startup.io; fo=1"}
{"create": {"_index": "logs-dmarc-default"}}
{"@timestamp": "2025-09-20T04:28:41.971711+00:00", "data_stream": {"type": "logs", "dataset": "dmarc", "namespace": "default"}, "message": "v=DMARC1; p=quarantine; pct=25; adkim=r; rua=mailto:dmarc@shop.example"}
{"create": {"_index": "logs-dmarc-default"}}
{"@timestamp": "2025-09-20T04:28:42.971711+00:00", "data_stream": {"type": "logs", "dataset": "dmarc", "namespace": "default"}, "message": "v=DMARC1; p=reject; adkim=s; aspf=s; rua=mailto:dmarc@global.example,mailto:dmarc@securityvendor.com; ruf=mailto:ruf@global.example; fo=1"}
Note, there is no KV processor in ESQL yet but here is an example... The challenge is that the KV pairs are variable... perhaps they are not in the real world of so this becomes much more simple.
So here is a DISSECT
, then looking at the rest of the string and plucking out each of the fields.
I have another example, and I am sure there are other ways to do it... When KV comes it will be a natural fit.
If you run it as a query in Kibana -> Dev Tools
POST _query?format=txt
{
"query" : """
FROM logs-dmarc-default
| DISSECT message "v=%{version}; p=%{policy}; %{rest}"
| EVAL
// Extract DKIM alignment mode (adkim) if present
dkim_alignment = CASE(
LOCATE(rest, "adkim=") > 0,
SUBSTRING(REPLACE(rest, ".*adkim=([^;]+).*", "$1"), 0),
NULL
),
// Extract SPF alignment mode (aspf) if present
spf_alignment = CASE(
LOCATE(rest, "aspf=") > 0,
SUBSTRING(REPLACE(rest, ".*aspf=([^;]+).*", "$1"), 0),
NULL
),
// Extract percentage (pct) if present
percentage = CASE(
LOCATE(rest, "pct=") > 0,
SUBSTRING(REPLACE(rest, ".*pct=([^;]+).*", "$1"), 0),
NULL
),
// Extract aggregate report URI (rua) if present
aggregate_reports = CASE(
LOCATE(rest, "rua=") > 0,
SUBSTRING(REPLACE(rest, ".*rua=([^;]+).*", "$1"), 0),
NULL
),
// Extract forensic report URI (ruf) if present
forensic_reports = CASE(
LOCATE(rest, "ruf=") > 0,
SUBSTRING(REPLACE(rest, ".*ruf=([^;]+).*", "$1"), 0),
NULL
),
// Extract forensic options (fo) if present
forensic_options = CASE(
LOCATE(rest, "fo=") > 0,
SUBSTRING(REPLACE(rest, ".*fo=([^;]+).*", "$1"), 0),
NULL
)
| DROP rest
| SORT @timestamp DESC
| KEEP @timestamp, message, dkim_alignment, spf_alignment, percentage, aggregate_reports, forensic_reports, forensic_options
| LIMIT 10
"""
}
You should get something like this
@timestamp | message |dkim_alignment | spf_alignment | percentage | aggregate_reports | forensic_reports |forensic_options
------------------------+-----------------------------------------------------------------------------------------------------------------------------------------+---------------+---------------+---------------+-----------------------------------------------------------+-------------------------------+----------------
2025-09-20T04:28:42.971Z|v=DMARC1; p=reject; adkim=s; aspf=s; rua=mailto:dmarc@global.example,mailto:dmarc@securityvendor.com; ruf=mailto:ruf@global.example; fo=1|s |s |null |mailto:dmarc@global.example,mailto:dmarc@securityvendor.com|mailto:ruf@global.example |1
2025-09-20T04:28:41.971Z|v=DMARC1; p=quarantine; pct=25; adkim=r; rua=mailto:dmarc@shop.example |r |null |25 |mailto:dmarc@shop.example |null |null
2025-09-20T04:28:40.971Z|v=DMARC1; p=none; rua=mailto:dmarc@startup.io; ruf=mailto:forensics@startup.io; fo=1 |null |null |null |mailto:dmarc@startup.io |mailto:forensics@startup.io |1
2025-09-20T04:28:39.971Z|v=DMARC1; p=reject; ruf=mailto:forensics@secure.example; fo=0 |null |null |null |null |mailto:forensics@secure.example|0
2025-09-20T04:28:38.971Z|v=DMARC1; p=reject; rua=mailto:dmarc1@bank.example,mailto:dmarc2@vendor.com |null |null |null |mailto:dmarc1@bank.example,mailto:dmarc2@vendor.com |null |null
2025-09-20T04:28:37.971Z|v=DMARC1; p=quarantine; pct=50; rua=mailto:dmarc@company.com |null |null |50 |mailto:dmarc@company.com |null |null
2025-09-20T04:28:36.971Z|v=DMARC1; p=reject; adkim=s; aspf=s; rua=mailto:reports@mail.example.com |s |s |null |mailto:reports@mail.example.com |null |null
2025-09-20T04:28:35.971Z|v=DMARC1; p=none; rua=mailto:dmarc-monitor@example.net |null |null |null |mailto:dmarc-monitor@example.net |null |null
2025-09-20T04:28:34.971Z|v=DMARC1; p=quarantine; adkim=r; aspf=r; rua=mailto:dmarc@example.org; ruf=mailto:forensics@example.org; fo=1 |r |r |null |mailto:dmarc@example.org |mailto:forensics@example.org |1
2025-09-20T04:28:33.971Z|v=DMARC1; p=reject; rua=mailto:dmarc-reports@example.com |null |null |null |mailto:dmarc-reports@example.com |null |null
If you run it from discover you will get this... all nice a parsed up.
FROM logs-dmarc-default
| DISSECT message "v=%{version}; p=%{policy}; %{rest}"
| EVAL
// Extract DKIM alignment mode (adkim) if present
dkim_alignment = CASE(
LOCATE(rest, "adkim=") > 0,
SUBSTRING(REPLACE(rest, ".*adkim=([^;]+).*", "$1"), 0),
NULL
),
// Extract SPF alignment mode (aspf) if present
spf_alignment = CASE(
LOCATE(rest, "aspf=") > 0,
SUBSTRING(REPLACE(rest, ".*aspf=([^;]+).*", "$1"), 0),
NULL
),
// Extract percentage (pct) if present
percentage = CASE(
LOCATE(rest, "pct=") > 0,
SUBSTRING(REPLACE(rest, ".*pct=([^;]+).*", "$1"), 0),
NULL
),
// Extract aggregate report URI (rua) if present
aggregate_reports = CASE(
LOCATE(rest, "rua=") > 0,
SUBSTRING(REPLACE(rest, ".*rua=([^;]+).*", "$1"), 0),
NULL
),
// Extract forensic report URI (ruf) if present
forensic_reports = CASE(
LOCATE(rest, "ruf=") > 0,
SUBSTRING(REPLACE(rest, ".*ruf=([^;]+).*", "$1"), 0),
NULL
),
// Extract forensic options (fo) if present
forensic_options = CASE(
LOCATE(rest, "fo=") > 0,
SUBSTRING(REPLACE(rest, ".*fo=([^;]+).*", "$1"), 0),
NULL
)
| DROP rest
| SORT @timestamp DESC
| KEEP @timestamp, message, dkim_alignment, spf_alignment, percentage, aggregate_reports, forensic_reports, forensic_options
| LIMIT 10
You will get something like this....
Hope this gives you some ideas!!
Lots of choices to process.
Have a great Weekend!