Hello guys,
OK, the integration is Fortigate Firewall which collects firewall logs from a log file. This integration is added to an agent policy.
What is important is the following:
- I've added a custom ingest pipeline to the integration policy named : "logs-fortinet_fortigate.log@custom" and its content is this:
PUT _ingest/pipeline/logs-fortinet_fortigate.log@custom
{
"description": "Reroute Firewall logs to sub-datastreams",
"version": 2,
"processors": [
{
"reroute": {
"dataset": [
"fortinet_fortigate.log_forward"
],
"namespace": [
"default"
],
"if": "ctx.fortinet.firewall.subtype == 'forward'"
}
}
]
}
When testing, I added a document from the fortigate firewall datastream indexes and the output of the test is as follows:
{
"docs": [
{
"doc": {
"_index": ".ds-fortinet_fortigate.log_forward-default",
"_version": "-3",
"_id": "1gtyfpgBGjSND2_ayxHd",
"_source": {
"agent": {
"name": "<AGENT_NAME>",
"id": "<AGENT_ID>",
"type": "filebeat",
"ephemeral_id": "24a8a5ad-f1ea-4b4a-bde9-ab67c3862d32",
"version": "8.16.6"
},
"log": {
"file": {
"path": "/var/logs/firewall.log"
},
"offset": 6571314386,
"level": "notice"
},
"elastic_agent": {
"id": "<AGENT_ID>",
"version": "8.16.6",
"snapshot": false
},
"destination": {
"port": 53,
"bytes": 91,
"mac": "00-50-56-93-30-50",
"packets": 1,
"ip": "192.168.1.5"
},
"rule": {
"ruleset": "policy",
"name": "<POLICY_NAME>",
"id": "<POLICY_ID>",
"category": "unscanned",
"uuid": "<POLICY_UUID>"
},
"source": {
"port": 40249,
"bytes": 75,
"mac": "00-00-00-11-11-11",
"packets": 1,
"ip": "192.168.1.6"
},
"tags": [
"fortinet-fortigate",
"fortinet-firewall",
"forwarded"
],
"network": {
"protocol": "dns",
"transport": "udp",
"bytes": 166,
"iana_number": "17",
"packets": 2
},
"input": {
"type": "log"
},
"observer": {
"ingress": {
"interface": {
"name": "<IFACE_NAME>"
}
},
"product": "Fortigate",
"vendor": "Fortinet",
"name": "<FIREWALL_NAME>",
"serial_number": "<FIREWALL_SERIAL_NUMBER>",
"type": "firewall",
"egress": {
"interface": {
"name": "<IFACE_NAME>"
}
}
},
"@timestamp": "2025-08-06T10:11:10.000+02:00",
"ecs": {
"version": "8.17.0"
},
"related": {
"ip": [
"<IP1>",
"<IP2>"
]
},
"data_stream": {
"namespace": "default",
"type": ".ds",
"dataset": "fortinet_fortigate.log_forward"
},
"fortinet": {
"firewall": {
"srcintfrole": "lan",
"logver": "0704072731",
"dsthwvendor": "<VENDOR>",
"srcserver": "0",
"itime": "1754467870",
"sessionid": "1114085446",
"itime_converted": "2025-08-06T08:11:10.000Z",
"type": "traffic",
"vd": "root",
"srccountry": "Reserved",
"dstintfrole": "lan",
"subtype": "forward",
"mastersrcmac": "00:50:56:a9:09:31",
"action": "accept",
"masterdstmac": "00:50:56:93:30:50",
"trandisp": "noop",
"dstcountry": "Reserved",
"srchwvendor": "<VENDOR>",
"timestamp": "1754478670",
"dstserver": "0"
}
},
"event": {
"code": "0000000013",
"timezone": "+0200",
"kind": "event",
"start": "2025-08-06T10:11:10.115+02:00",
"type": [
"connection",
"end",
"allowed"
],
"duration": 181000000000,
"agent_id_status": "verified",
"ingested": "2025-08-06T08:15:09Z",
"action": "accept",
"category": [
"network"
],
"dataset": "fortinet_fortigate.log_forward",
"outcome": "success"
}
},
"_ingest": {
"timestamp": "2025-08-06T08:23:21.418586505Z"
}
}
}
]
}
Which shows that the ingest pipeline works perfectly and the index ".ds-fortinet_fortigate.log_forward-default" will be created.
But, after applying this, the index will not be created and the newly received forward logs will not be indexed.
I'm using Elasticsearch version 8.16.1