How to dynamically route FortiGate logs to different data_stream namespaces based on devid using Logstash?

Hi everyone,

I’m currently working on integrating logs from two FortiGate devices into Elasticsearch via the following pipeline:

FortiGate (x2) → Elastic Agent (Fortigate Integration) → Logstash (Data Stream dynamic namespace ) → Elasticsearch

Both FortiGate devices are sending logs via Elastic Agent to the same host (192.168.1.2:517), and here's a simplified version of my elastic-agent.yml config:

inputs:
  - id: udp-fortinet_fortigate-3e50017f-59a9-4009-951e-5b02516c74d9
    name: XXXXX_fortinet_fortigate
    revision: 5
    type: udp
    use_output: fleet-default-output
    meta:
      package:
        name: fortinet_fortigate
        version: 1.28.0
    data_stream:
      namespace: default
    package_policy_id: 3e50017f-59a9-4009-951e-5b02516c74d9
    streams:
      - id: udp-fortinet_fortigate.log-3e50017f-59a9-4009-951e-5b02516c74d9
        data_stream:
          dataset: fortinet_fortigate.log
          type: logs
        host: '192.168.1.2:517'
        tags:
          - preserve_original_event
          - fortinet-fortigate
          - fortinet-firewall
          - forwarded
        publisher_pipeline.disable_host: true
        processors:
          - add_fields:
              target: _temp
              fields:
                internal_networks:
                  - private

"message" => "<189>date=2025-04-22 time=17:07:01 devname="FG-81F" devid="FGT81xxxxx" eventtime=1745312820806637289 tz="+0800" logid="0001000014" type="traffic" subtype="local" level="notice" vd="root" srcip=fe80::20f:3aff:fea8:d71c srcport=546 srcintf="wan1" srcintfrole="wan" dstip=ff02::1:2 dstport=547 dstintf="unknown-0" dstintfrole="undefined" replysrcintf="root" sessionid=9199206 proto=17 action="deny" policyid=0 policytype="local-in-policy6" service="DHCP6" trandisp="noop" app="DHCP6" duration=0 sentbyte=0 rcvdbyte=0 sentpkt=0 rcvdpkt=0 msg="Connection Failed""

"message" => "<189>date=2025-04-22 time=17:06:59 devname="FG-40F" devid="FGT40xxxxx" eventtime=1745312818809881780 tz="+0800" logid="0001000014" type="traffic" subtype="local" level="notice" vd="root" srcip=192.168.1.4 srcport=49794 srcintf="wan" srcintfrole="wan" dstip=255.255.255.255 dstport=1947 dstintf="unknown-0" dstintfrole="undefined" srccountry="Reserved" dstcountry="Reserved" sessionid=3031780 proto=17 action="deny" policyid=0 policytype="local-in-policy" service="udp/1947" trandisp="noop" app="udp/1947" duration=0 sentbyte=0 rcvdbyte=0 sentpkt=0 rcvdpkt=0 msg="Connection Failed""

In Logstash, I want to extract devid from the log message and use it to dynamically route the event to a specific data_stream.namespace. My goal is to keep logs from different FortiGates logically separated in their own data streams.

Here's my current Logstash pipeline config:

input {
  elastic_agent {
    port => 5044
  }
}

filter {
  grok {
    match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{IP:syslog_ip} %{GREEDYDATA:message}" }
    overwrite => ["message"]
  }
  grok {
    match => { "message" => 'devid="%{DATA:device_id}"' }
  }
  mutate {
    add_field => { "[@metadata][ds_namespace]" => "%{device_id}" }
  }
}

output {
  elasticsearch {
    hosts => ["https://10.200.2.21:9200"]
    user => "elastic"
    password => "xxx"
    data_stream => true
    data_stream_type => "logs"
    data_stream_dataset => "fortinet_fortigate.log"
    data_stream_namespace => "%{[@metadata][ds_namespace]}"  # <== This is the key
    ssl => true
    ssl_certificate_verification => false
  }
}

Ultimately, I would like the logs to be routed into two separate data streams in Elasticsearch based on the devid of each FortiGate device, resulting in:

  • logs-fortinet_fortigate.log-FGT81xxxxx
  • logs-fortinet_fortigate.log-FGT40xxxxx

I want to dynamically route logs to different data_stream.namespace values without affecting the field parsing done by the Elastic Agent integration.

Is this approach using [@metadata][ds_namespace] correct for achieving that goal?
Any advice or best practices would be greatly appreciated.

Thanks in advance :folded_hands:

Yes, but you should use only @metadata fields and do not change the source message, your grok filter would overwrite the message field and this may break the ingest pipeline in Elasticsearch.

The general recommendation when using Logstash between an Agent and Elasticsearch is to not change the structure of the document sent by the agent.

Be aware that the structure of the documents sent from Elastic Agent to Logstash must not be modified by the pipeline. (...) We cannot guarantee that the Elasticsearch ingest pipelines associated to the integrations using Elastic Agent can work with missing or modified fields.

But what you want may work if you use only @metadata fields.

Besides that, fortigate logs are key-value pairs, grok is not needed and you can use dissect instead, you also need to lowercase the device id value.

Something like this should work:

filter {
    dissect {
        mapping => {
            "message" => '%{}devid="%{[@metadata][ds_namespace]}"%{}'
        }
    }
    mutate {
        lowercase => ["[@metadata][ds_namespace]"]
    }
}

My goal is to keep logs from different FortiGates logically separated in their own data streams.

But what is the requirement for this? Are you planning to have different retentions based on the device? If so, there are extra steps like cloning the integration template and create one template for each data stream.

@leandrojmp

I was able to successfully extract the devid and map it to the ds_namespace.
However, the final indexed logs still end up in logs-fortinet_fortigate.log-default,
which is the namespace configured in the elastic-agent.yml.

How can I correctly override the `data_stream.namespace so that logs go to:

  • logs-fortinet_fortigate.log-FGT81xxxxx
  • logs-fortinet_fortigate.log-FGT40xxxxx

Is there something wrong with my Logstash output configuration?

Apr 23 13:26:01 TWPORT-Logstash logstash[205012]:         "@metadata" => {
Apr 23 13:26:01 TWPORT-Logstash logstash[205012]:                 "type" => "_doc",
Apr 23 13:26:01 TWPORT-Logstash logstash[205012]:            "raw_index" => "logs-fortinet_fortigate.log-default",
Apr 23 13:26:01 TWPORT-Logstash logstash[205012]:            "stream_id" => "udp-fortinet_fortigate.log-3e50017f-59a9-4009-951e-5b02516c74d9",
Apr 23 13:26:01 TWPORT-Logstash logstash[205012]:         "ds_namespace" => "fgt81xxxxx",
Apr 23 13:26:01 TWPORT-Logstash logstash[205012]:             "input_id" => "udp-fortinet_fortigate-3e50017f-59a9-4009-951e-5b02516c74d9",
Apr 23 13:26:01 TWPORT-Logstash logstash[205012]:              "version" => "8.15.0",
Apr 23 13:26:01 TWPORT-Logstash logstash[205012]:            "truncated" => false,
Apr 23 13:26:01 TWPORT-Logstash logstash[205012]:                 "beat" => "filebeat",
Apr 23 13:26:01 TWPORT-Logstash logstash[205012]:                "input" => {
Apr 23 13:26:01 TWPORT-Logstash logstash[205012]:             "beats" => {
Apr 23 13:26:01 TWPORT-Logstash logstash[205012]:                 "host" => {
Apr 23 13:26:01 TWPORT-Logstash logstash[205012]:                     "ip" => "192.168.1.2"
Apr 23 13:26:01 TWPORT-Logstash logstash[205012]:                 }
Apr 23 13:26:01 TWPORT-Logstash logstash[205012]:             }
Apr 23 13:26:01 TWPORT-Logstash logstash[205012]:         }
Apr 23 13:26:01 TWPORT-Logstash logstash[205012]:     },
Apr 23 13:26:01 TWPORT-Logstash logstash[205012]:     "elastic_agent" => {
Apr 23 13:26:01 TWPORT-Logstash logstash[205012]:               "id" => "8d0b6350-5d48-46dd-9d70-c846d5357f4d",
Apr 23 13:26:01 TWPORT-Logstash logstash[205012]:          "version" => "8.15.0",
Apr 23 13:26:01 TWPORT-Logstash logstash[205012]:         "snapshot" => false
Apr 23 13:26:01 TWPORT-Logstash logstash[205012]:     },
Apr 23 13:26:01 TWPORT-Logstash logstash[205012]:           "message" => "<189>date=2025-04-23 time=13:26:04 devname=\"FG-81F\" devid=\"FGT81xxxxx\" eventtime=1745385964365579510 tz=\"+0800\" logid=\"0000000013\" type=\"traffic\" subtype=\"forward\" level=\"notice\" vd=\"root\" srcip=10.1.110.71 srcport=51251 srcintf=\"wan1\" srcintfrole=\"wan\" dstip=192.168.1.34 dstport=7680 dstintf=\"internal\" dstintfrole=\"lan\" srccountry=\"Reserved\" dstcountry=\"Reserved\" sessionid=2473284264 proto=6 action=\"deny\" policyid=0 policytype=\"policy\" service=\"tcp/7680\" trandisp=\"noop\" appcat=\"unscanned\" duration=0 sentbyte=0 rcvdbyte=0 sentpkt=0 rcvdpkt=0 crscore=30 craction=131072 crlevel=\"high\"",
input {
  elastic_agent {
    port => 5044
  }
}

filter {
  dissect {
    mapping => {
      "message" => '%{}devid="%{[@metadata][ds_namespace]}"%{}'
    }
  }
}

output {
  elasticsearch {
    hosts => ["https://10.200.2.21:9200"]
    user => "elastic"
    password => "xxx"
    data_stream => true
    data_stream_type => "logs"
    data_stream_dataset => "fortinet_fortigate.log"
    data_stream_namespace => "%{[@metadata][ds_namespace]}"  # <== This is the key
    ssl => true
    ssl_certificate_verification => false
  }
}

I'm not sure exactly what went wrong, but one thing that is wrong is that you didn't use the mutate lowercase as mentioned, it is a requirement.

But it doesn't seem to matter in this case as it looks like that the data_stream_* fields are ignored and the @metadata.raw_index field is used, if the data_stream-* fields were being used you would get an error because of the uppercase letters.

So your filter block would need to be more complicated, because you would need to change the value of the @metadata.raw_index.

I think something like this should work:

filter {
  dissect {
    mapping => {
      "message" => '%{}devid="%{[@metadata][ds_namespace]}"%{}'
    }
  }
  mutate {
    lowercase => ["[@metadata][ds_namespace]"]
  }
  mutate {
    remove_field => ["[@metadata][raw_index]"]
  }
  if "fgt" in [@metadata][ds_namespace] {
    mutate {
      add_field => {
        "[@metadata][raw_index]" => "logs-fortinet_fortigate.log-%{[@metadata][ds_namespace]}"
      }
    }
  }
}

And then remove the data_stream_* settings from the output.