Hiya @geoffrey I had a little fun tonight
Note not a DMARC expert, let the
(LLM) help me.
_bulk
load some test DMARC You might want to update the time stamps
Kibana -> Dev Tools
DELETE _data_stream/logs-dmarc-default
POST _bulk
{"create": {"_index": "logs-dmarc-default"}}
{"@timestamp": "2025-09-20T04:28:33.971711+00:00", "data_stream": {"type": "logs", "dataset": "dmarc", "namespace": "default"}, "message": "v=DMARC1; p=reject; rua=mailto:dmarc-reports@example.com"}
{"create": {"_index": "logs-dmarc-default"}}
{"@timestamp": "2025-09-20T04:28:34.971711+00:00", "data_stream": {"type": "logs", "dataset": "dmarc", "namespace": "default"}, "message": "v=DMARC1; p=quarantine; adkim=r; aspf=r; rua=mailto:dmarc@example.org; ruf=mailto:forensics@example.org; fo=1"}
{"create": {"_index": "logs-dmarc-default"}}
{"@timestamp": "2025-09-20T04:28:35.971711+00:00", "data_stream": {"type": "logs", "dataset": "dmarc", "namespace": "default"}, "message": "v=DMARC1; p=none; rua=mailto:dmarc-monitor@example.net"}
{"create": {"_index": "logs-dmarc-default"}}
{"@timestamp": "2025-09-20T04:28:36.971711+00:00", "data_stream": {"type": "logs", "dataset": "dmarc", "namespace": "default"}, "message": "v=DMARC1; p=reject; adkim=s; aspf=s; rua=mailto:reports@mail.example.com"}
{"create": {"_index": "logs-dmarc-default"}}
{"@timestamp": "2025-09-20T04:28:37.971711+00:00", "data_stream": {"type": "logs", "dataset": "dmarc", "namespace": "default"}, "message": "v=DMARC1; p=quarantine; pct=50; rua=mailto:dmarc@company.com"}
{"create": {"_index": "logs-dmarc-default"}}
{"@timestamp": "2025-09-20T04:28:38.971711+00:00", "data_stream": {"type": "logs", "dataset": "dmarc", "namespace": "default"}, "message": "v=DMARC1; p=reject; rua=mailto:dmarc1@bank.example,mailto:dmarc2@vendor.com"}
{"create": {"_index": "logs-dmarc-default"}}
{"@timestamp": "2025-09-20T04:28:39.971711+00:00", "data_stream": {"type": "logs", "dataset": "dmarc", "namespace": "default"}, "message": "v=DMARC1; p=reject; ruf=mailto:forensics@secure.example; fo=0"}
{"create": {"_index": "logs-dmarc-default"}}
{"@timestamp": "2025-09-20T04:28:40.971711+00:00", "data_stream": {"type": "logs", "dataset": "dmarc", "namespace": "default"}, "message": "v=DMARC1; p=none; rua=mailto:dmarc@startup.io; ruf=mailto:forensics@startup.io; fo=1"}
{"create": {"_index": "logs-dmarc-default"}}
{"@timestamp": "2025-09-20T04:28:41.971711+00:00", "data_stream": {"type": "logs", "dataset": "dmarc", "namespace": "default"}, "message": "v=DMARC1; p=quarantine; pct=25; adkim=r; rua=mailto:dmarc@shop.example"}
{"create": {"_index": "logs-dmarc-default"}}
{"@timestamp": "2025-09-20T04:28:42.971711+00:00", "data_stream": {"type": "logs", "dataset": "dmarc", "namespace": "default"}, "message": "v=DMARC1; p=reject; adkim=s; aspf=s; rua=mailto:dmarc@global.example,mailto:dmarc@securityvendor.com; ruf=mailto:ruf@global.example; fo=1"}
Note, there is no KV processor yet but here is an example...
So here is a DISSECT
, then looking at the rest of the string and plucking out each of the fields.
I have another example, and I am sure there are other ways to do it... When KV comes it will be a natural fit.
If you run it as a query in Kibana -> Dev Tools
POST _query?format=txt
{
"query" : """
FROM logs-dmarc-default
| DISSECT message "v=%{version}; p=%{policy}; %{rest}"
| EVAL
// Extract DKIM alignment mode (adkim) if present
dkim_alignment = CASE(
LOCATE(rest, "adkim=") > 0,
SUBSTRING(REPLACE(rest, ".*adkim=([^;]+).*", "$1"), 0),
NULL
),
// Extract SPF alignment mode (aspf) if present
spf_alignment = CASE(
LOCATE(rest, "aspf=") > 0,
SUBSTRING(REPLACE(rest, ".*aspf=([^;]+).*", "$1"), 0),
NULL
),
// Extract percentage (pct) if present
percentage = CASE(
LOCATE(rest, "pct=") > 0,
SUBSTRING(REPLACE(rest, ".*pct=([^;]+).*", "$1"), 0),
NULL
),
// Extract aggregate report URI (rua) if present
aggregate_reports = CASE(
LOCATE(rest, "rua=") > 0,
SUBSTRING(REPLACE(rest, ".*rua=([^;]+).*", "$1"), 0),
NULL
),
// Extract forensic report URI (ruf) if present
forensic_reports = CASE(
LOCATE(rest, "ruf=") > 0,
SUBSTRING(REPLACE(rest, ".*ruf=([^;]+).*", "$1"), 0),
NULL
),
// Extract forensic options (fo) if present
forensic_options = CASE(
LOCATE(rest, "fo=") > 0,
SUBSTRING(REPLACE(rest, ".*fo=([^;]+).*", "$1"), 0),
NULL
)
| DROP rest
| SORT @timestamp DESC
| KEEP @timestamp, message, dkim_alignment, spf_alignment, percentage, aggregate_reports, forensic_reports, forensic_options
| LIMIT 10
"""
}
You should get something like this
@timestamp | message |dkim_alignment | spf_alignment | percentage | aggregate_reports | forensic_reports |forensic_options
------------------------+-----------------------------------------------------------------------------------------------------------------------------------------+---------------+---------------+---------------+-----------------------------------------------------------+-------------------------------+----------------
2025-09-20T04:28:42.971Z|v=DMARC1; p=reject; adkim=s; aspf=s; rua=mailto:dmarc@global.example,mailto:dmarc@securityvendor.com; ruf=mailto:ruf@global.example; fo=1|s |s |null |mailto:dmarc@global.example,mailto:dmarc@securityvendor.com|mailto:ruf@global.example |1
2025-09-20T04:28:41.971Z|v=DMARC1; p=quarantine; pct=25; adkim=r; rua=mailto:dmarc@shop.example |r |null |25 |mailto:dmarc@shop.example |null |null
2025-09-20T04:28:40.971Z|v=DMARC1; p=none; rua=mailto:dmarc@startup.io; ruf=mailto:forensics@startup.io; fo=1 |null |null |null |mailto:dmarc@startup.io |mailto:forensics@startup.io |1
2025-09-20T04:28:39.971Z|v=DMARC1; p=reject; ruf=mailto:forensics@secure.example; fo=0 |null |null |null |null |mailto:forensics@secure.example|0
2025-09-20T04:28:38.971Z|v=DMARC1; p=reject; rua=mailto:dmarc1@bank.example,mailto:dmarc2@vendor.com |null |null |null |mailto:dmarc1@bank.example,mailto:dmarc2@vendor.com |null |null
2025-09-20T04:28:37.971Z|v=DMARC1; p=quarantine; pct=50; rua=mailto:dmarc@company.com |null |null |50 |mailto:dmarc@company.com |null |null
2025-09-20T04:28:36.971Z|v=DMARC1; p=reject; adkim=s; aspf=s; rua=mailto:reports@mail.example.com |s |s |null |mailto:reports@mail.example.com |null |null
2025-09-20T04:28:35.971Z|v=DMARC1; p=none; rua=mailto:dmarc-monitor@example.net |null |null |null |mailto:dmarc-monitor@example.net |null |null
2025-09-20T04:28:34.971Z|v=DMARC1; p=quarantine; adkim=r; aspf=r; rua=mailto:dmarc@example.org; ruf=mailto:forensics@example.org; fo=1 |r |r |null |mailto:dmarc@example.org |mailto:forensics@example.org |1
2025-09-20T04:28:33.971Z|v=DMARC1; p=reject; rua=mailto:dmarc-reports@example.com |null |null |null |mailto:dmarc-reports@example.com |null |null
If you run it from discover you will get this... all nice a parsed up.
FROM logs-dmarc-default
| DISSECT message "v=%{version}; p=%{policy}; %{rest}"
| EVAL
// Extract DKIM alignment mode (adkim) if present
dkim_alignment = CASE(
LOCATE(rest, "adkim=") > 0,
SUBSTRING(REPLACE(rest, ".*adkim=([^;]+).*", "$1"), 0),
NULL
),
// Extract SPF alignment mode (aspf) if present
spf_alignment = CASE(
LOCATE(rest, "aspf=") > 0,
SUBSTRING(REPLACE(rest, ".*aspf=([^;]+).*", "$1"), 0),
NULL
),
// Extract percentage (pct) if present
percentage = CASE(
LOCATE(rest, "pct=") > 0,
SUBSTRING(REPLACE(rest, ".*pct=([^;]+).*", "$1"), 0),
NULL
),
// Extract aggregate report URI (rua) if present
aggregate_reports = CASE(
LOCATE(rest, "rua=") > 0,
SUBSTRING(REPLACE(rest, ".*rua=([^;]+).*", "$1"), 0),
NULL
),
// Extract forensic report URI (ruf) if present
forensic_reports = CASE(
LOCATE(rest, "ruf=") > 0,
SUBSTRING(REPLACE(rest, ".*ruf=([^;]+).*", "$1"), 0),
NULL
),
// Extract forensic options (fo) if present
forensic_options = CASE(
LOCATE(rest, "fo=") > 0,
SUBSTRING(REPLACE(rest, ".*fo=([^;]+).*", "$1"), 0),
NULL
)
| DROP rest
| SORT @timestamp DESC
| KEEP @timestamp, message, dkim_alignment, spf_alignment, percentage, aggregate_reports, forensic_reports, forensic_options
| LIMIT 10
You will get something like this....
Hope this gives you some ideas!!
Have a great Weekend!