Hi everyone,
I’m currently working on integrating logs from two FortiGate devices into Elasticsearch via the following pipeline:
FortiGate (x2) → Elastic Agent (Fortigate Integration) → Logstash (Data Stream dynamic namespace ) → Elasticsearch
Both FortiGate devices are sending logs via Elastic Agent to the same host (192.168.1.2:517
), and here's a simplified version of my elastic-agent.yml
config:
inputs:
- id: udp-fortinet_fortigate-3e50017f-59a9-4009-951e-5b02516c74d9
name: XXXXX_fortinet_fortigate
revision: 5
type: udp
use_output: fleet-default-output
meta:
package:
name: fortinet_fortigate
version: 1.28.0
data_stream:
namespace: default
package_policy_id: 3e50017f-59a9-4009-951e-5b02516c74d9
streams:
- id: udp-fortinet_fortigate.log-3e50017f-59a9-4009-951e-5b02516c74d9
data_stream:
dataset: fortinet_fortigate.log
type: logs
host: '192.168.1.2:517'
tags:
- preserve_original_event
- fortinet-fortigate
- fortinet-firewall
- forwarded
publisher_pipeline.disable_host: true
processors:
- add_fields:
target: _temp
fields:
internal_networks:
- private
"message" => "<189>date=2025-04-22 time=17:07:01 devname="FG-81F" devid="FGT81xxxxx" eventtime=1745312820806637289 tz="+0800" logid="0001000014" type="traffic" subtype="local" level="notice" vd="root" srcip=fe80::20f:3aff:fea8:d71c srcport=546 srcintf="wan1" srcintfrole="wan" dstip=ff02::1:2 dstport=547 dstintf="unknown-0" dstintfrole="undefined" replysrcintf="root" sessionid=9199206 proto=17 action="deny" policyid=0 policytype="local-in-policy6" service="DHCP6" trandisp="noop" app="DHCP6" duration=0 sentbyte=0 rcvdbyte=0 sentpkt=0 rcvdpkt=0 msg="Connection Failed""
"message" => "<189>date=2025-04-22 time=17:06:59 devname="FG-40F" devid="FGT40xxxxx" eventtime=1745312818809881780 tz="+0800" logid="0001000014" type="traffic" subtype="local" level="notice" vd="root" srcip=192.168.1.4 srcport=49794 srcintf="wan" srcintfrole="wan" dstip=255.255.255.255 dstport=1947 dstintf="unknown-0" dstintfrole="undefined" srccountry="Reserved" dstcountry="Reserved" sessionid=3031780 proto=17 action="deny" policyid=0 policytype="local-in-policy" service="udp/1947" trandisp="noop" app="udp/1947" duration=0 sentbyte=0 rcvdbyte=0 sentpkt=0 rcvdpkt=0 msg="Connection Failed""
In Logstash, I want to extract devid
from the log message and use it to dynamically route the event to a specific data_stream.namespace
. My goal is to keep logs from different FortiGates logically separated in their own data streams.
Here's my current Logstash pipeline config:
input {
elastic_agent {
port => 5044
}
}
filter {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{IP:syslog_ip} %{GREEDYDATA:message}" }
overwrite => ["message"]
}
grok {
match => { "message" => 'devid="%{DATA:device_id}"' }
}
mutate {
add_field => { "[@metadata][ds_namespace]" => "%{device_id}" }
}
}
output {
elasticsearch {
hosts => ["https://10.200.2.21:9200"]
user => "elastic"
password => "xxx"
data_stream => true
data_stream_type => "logs"
data_stream_dataset => "fortinet_fortigate.log"
data_stream_namespace => "%{[@metadata][ds_namespace]}" # <== This is the key
ssl => true
ssl_certificate_verification => false
}
}
Ultimately, I would like the logs to be routed into two separate data streams in Elasticsearch based on the devid
of each FortiGate device, resulting in:
logs-fortinet_fortigate.log-FGT81xxxxx
logs-fortinet_fortigate.log-FGT40xxxxx
I want to dynamically route logs to different data_stream.namespace
values without affecting the field parsing done by the Elastic Agent integration.
Is this approach using [@metadata][ds_namespace]
correct for achieving that goal?
Any advice or best practices would be greatly appreciated.
Thanks in advance