Hi everyone,
I'd like to share my Logstash configuration for monitoring network interfaces via SNMP with bandwidth utilization calculation. This configuration collects interface statistics from network devices (tested with FortiGate), calculates bandwidth utilization in Mbps, and stores the data in Elasticsearch.
I've always wanted to create this config for the past two years but never had enough time to learn the Ruby syntax and debug all the issues. However, today, with some focus, I managed to create and test it within a few hours. The code was tested on FortiGate but should also work on Cisco ASA.
Key features:
- SNMP v3 interface monitoring
- Bandwidth calculation (Mbps and utilization percentage)
- Interface status tracking
- Custom interface naming
- Historical data caching
- Error handling and validation
The configuration includes:
- SNMP input with detailed interface metrics
- Data processing with Ruby filter
- Interface status text mapping
- Elasticsearch output
I've removed sensitive information and replaced it with placeholder values. You'll need to adjust the following:
- SNMP credentials
- Device IP addresses
- Elasticsearch connection details
- Interface mappings
- Hostname settings
IMPORTANT: This configuration is optimized for 60-second polling intervals. If you want to use different intervals, you need to adjust the time delta validation in the Ruby filter (currently set to check if time_delta is between 55-65 seconds). The validation ensures accurate bandwidth calculations by verifying the time between measurements.
The complete code is attached below. Feel free to modify it for your needs!
input {
snmp {
interval => 60
hosts => [
{ host => "udp:YOUR_DEVICE_IP/161" version => "3" retries => 5 }
]
security_name => "${SNMP_USER}"
auth_protocol => "sha"
auth_pass => "${SNMP_AUTH_PASS}"
priv_protocol => "aes128"
priv_pass => "${SNMP_PRIV_PASS}"
security_level => "authPriv"
tables => [
{
name => "interfaces"
columns => [
"1.3.6.1.2.1.2.2.1.1", # ifIndex
"1.3.6.1.2.1.2.2.1.3", # ifType
"1.3.6.1.2.1.2.2.1.4", # ifMtu
"1.3.6.1.2.1.2.2.1.5", # ifSpeed
"1.3.6.1.2.1.2.2.1.6", # ifPhysAddress
"1.3.6.1.2.1.2.2.1.7", # ifAdminStatus
"1.3.6.1.2.1.2.2.1.8", # ifOperStatus
"1.3.6.1.2.1.2.2.1.9", # ifLastChange
"1.3.6.1.2.1.31.1.1.1.6", # ifHCInOctets
"1.3.6.1.2.1.2.2.1.11", # ifInUcastPkts
"1.3.6.1.2.1.2.2.1.12", # ifInNUcastPkts
"1.3.6.1.2.1.2.2.1.13", # ifInDiscards
"1.3.6.1.2.1.2.2.1.14", # ifInErrors
"1.3.6.1.2.1.2.2.1.15", # ifInUnknownProtos
"1.3.6.1.2.1.31.1.1.1.10",# ifHCOutOctets
"1.3.6.1.2.1.2.2.1.17", # ifOutUcastPkts
"1.3.6.1.2.1.2.2.1.18", # ifOutNUcastPkts
"1.3.6.1.2.1.2.2.1.19", # ifOutDiscards
"1.3.6.1.2.1.2.2.1.20", # ifOutErrors
"1.3.6.1.2.1.31.1.1.1.1" # ifName
]
}
]
add_field => { "host" => "%{[@metadata][input][snmp][host][address]}" }
tags => ["snmp", "interface"]
}
}
filter {
split {
field => "interfaces"
}
mutate {
remove_field => ["[interfaces]", "[@version]"]
rename => {
"[interfaces][iso.org.dod.internet.mgmt.mib-2.ifMIB.ifMIBObjects.ifXTable.ifXEntry.ifHCOutOctets]" => "interface.ifOutOctets"
"[interfaces][iso.org.dod.internet.mgmt.mib-2.interfaces.ifTable.ifEntry.ifType]" => "interface.ifType"
"[interfaces][iso.org.dod.internet.mgmt.mib-2.interfaces.ifTable.ifEntry.ifSpeed]" => "interface.ifSpeed"
"[interfaces][iso.org.dod.internet.mgmt.mib-2.ifMIB.ifMIBObjects.ifXTable.ifXEntry.ifName]" => "interface.ifName"
"[interfaces][iso.org.dod.internet.mgmt.mib-2.interfaces.ifTable.ifEntry.ifOutUcastPkts]" => "interface.ifOutUcastPkts"
"[interfaces][iso.org.dod.internet.mgmt.mib-2.interfaces.ifTable.ifEntry.ifInNUcastPkts]" => "interface.ifInNUcastPkts"
"[interfaces][iso.org.dod.internet.mgmt.mib-2.interfaces.ifTable.ifEntry.ifInUcastPkts]" => "interface.ifInUcastPkts"
"[interfaces][iso.org.dod.internet.mgmt.mib-2.interfaces.ifTable.ifEntry.ifOutNUcastPkts]" => "interface.ifOutNUcastPkts"
"[interfaces][iso.org.dod.internet.mgmt.mib-2.interfaces.ifTable.ifEntry.ifInDiscards]" => "interface.ifInDiscards"
"[interfaces][iso.org.dod.internet.mgmt.mib-2.interfaces.ifTable.ifEntry.ifOutErrors]" => "interface.ifOutErrors"
"[interfaces][iso.org.dod.internet.mgmt.mib-2.interfaces.ifTable.ifEntry.ifOperStatus]" => "interface.ifOperStatus"
"[interfaces][iso.org.dod.internet.mgmt.mib-2.interfaces.ifTable.ifEntry.ifIndex]" => "interface.ifIndex"
"[interfaces][iso.org.dod.internet.mgmt.mib-2.interfaces.ifTable.ifEntry.ifInErrors]" => "interface.ifInErrors"
"[interfaces][iso.org.dod.internet.mgmt.mib-2.interfaces.ifTable.ifEntry.ifOutDiscards]" => "interface.ifOutDiscards"
"[interfaces][iso.org.dod.internet.mgmt.mib-2.ifMIB.ifMIBObjects.ifXTable.ifXEntry.ifHCInOctets]" => "interface.ifInOctets"
"[interfaces][iso.org.dod.internet.mgmt.mib-2.interfaces.ifTable.ifEntry.ifPhysAddress]" => "interface.ifPhysAddress"
"[interfaces][iso.org.dod.internet.mgmt.mib-2.interfaces.ifTable.ifEntry.ifLastChange]" => "interface.ifLastChange"
"[interfaces][iso.org.dod.internet.mgmt.mib-2.interfaces.ifTable.ifEntry.ifMtu]" => "interface.ifMtu"
"[interfaces][iso.org.dod.internet.mgmt.mib-2.interfaces.ifTable.ifEntry.ifAdminStatus]" => "interface.ifAdminStatus"
}
rename => { "host" => "host.ip" }
add_field => { "host.hostname" => "YOUR_DEVICE_NAME" }
}
ruby {
code => '
require "json"
require "fileutils"
# Directory for storing interface statistics cache
cache_dir = "/etc/logstash/interfaces_cache/interface_stats"
FileUtils.mkdir_p(cache_dir)
# Define interface name mappings
interface_mappings = {
"port1" => "WAN",
"port2" => "LAN",
"port3" => "DMZ"
}
# Get custom name for interface
def get_custom_name(if_name, mappings)
mappings[if_name]
end
# Calculate bandwidth utilization
def calculate_utilization(current, previous, time_delta, speed)
return 0.0 if speed.to_i == 0 || time_delta == 0
delta = (current.to_i - previous.to_i).abs
bits_per_second = (delta * 8.0) / time_delta
mbps = (bits_per_second / 1_000_000.0).round(2) # Convert to Mbps
[mbps, ((bits_per_second / speed.to_f) * 100.0).round(2)]
end
begin
host_ip = event.get("host.ip").to_s.gsub(".", "_")
if_index = event.get("interface.ifIndex").to_s
interface_name = event.get("interface.ifName").to_s
custom_name = get_custom_name(interface_name, interface_mappings)
if custom_name
event.set("interface.ifCustomName", custom_name)
event.set("interface.ifCustomNameIn", "IN #{custom_name}...")
event.set("interface.ifCustomNameOut", "OUT #{custom_name}...")
end
cache_file = File.join(cache_dir, "#{host_ip}_#{if_index}.json")
current_time = Time.now.to_i
current_in = event.get("interface.ifInOctets").to_i
current_out = event.get("interface.ifOutOctets").to_i
speed = event.get("interface.ifSpeed").to_i
if File.exist?(cache_file)
previous = JSON.parse(File.read(cache_file))
time_delta = current_time - previous["timestamp"].to_i
if time_delta >= 55 && time_delta <= 65 && speed > 0
in_mbps, in_utilization = calculate_utilization(
current_in,
previous["in_octets"],
time_delta,
speed
)
out_mbps, out_utilization = calculate_utilization(
current_out,
previous["out_octets"],
time_delta,
speed
)
event.set("interface.inUtilization", in_utilization)
event.set("interface.outUtilization", out_utilization)
event.set("interface.inMbps", in_mbps)
event.set("interface.outMbps", out_mbps)
event.set("interface.timeDelta", time_delta)
event.set("interface.measurementValid", true)
event.set("interface.lastMeasurementTime", previous["timestamp"])
else
event.set("interface.measurementValid", false)
event.set("interface.timeDelta", time_delta)
event.set("interface.lastMeasurementTime", previous["timestamp"])
event.set("interface.invalidReason", speed > 0 ? "Invalid time delta" : "Speed is 0")
event.set("interface.inMbps", 0.0)
event.set("interface.outMbps", 0.0)
end
else
event.set("interface.measurementValid", false)
event.set("interface.timeDelta", 0)
event.set("interface.invalidReason", "No previous measurement")
event.set("interface.inMbps", 0.0)
event.set("interface.outMbps", 0.0)
end
current_data = {
"timestamp" => current_time,
"measurement_time" => Time.now.utc.strftime("%Y-%m-%d %H:%M:%S UTC"),
"in_octets" => current_in,
"out_octets" => current_out,
"speed" => speed,
"interface_name" => interface_name,
"custom_name" => custom_name,
"host_ip" => event.get("host.ip"),
"if_index" => if_index
}
File.write(cache_file, JSON.generate(current_data))
rescue => e
event.set("ruby_error", e.message)
event.set("interface.measurementValid", false)
event.set("interface.invalidReason", "Error: #{e.message}")
event.set("interface.inMbps", 0.0)
event.set("interface.outMbps", 0.0)
end
'
}
# Map interface operational status to text
if [interface.ifOperStatus] == 1 {
mutate { add_field => { "interface.ifOperStatusText" => "up" } }
} else {
if [interface.ifOperStatus] == 2 {
mutate { add_field => { "interface.ifOperStatusText" => "down" } }
} else {
if [interface.ifOperStatus] == 3 {
mutate { add_field => { "interface.ifOperStatusText" => "testing" } }
} else {
if [interface.ifOperStatus] == 4 {
mutate { add_field => { "interface.ifOperStatusText" => "unknown" } }
} else {
if [interface.ifOperStatus] == 5 {
mutate { add_field => { "interface.ifOperStatusText" => "notPresent" } }
} else {
if [interface.ifOperStatus] == 6 {
mutate { add_field => { "interface.ifOperStatusText" => "lowerLayerDown" } }
}
}
}
}
}
}
# Map interface administrative status to text
if [interface.ifAdminStatus] == 1 {
mutate { add_field => { "interface.ifAdminStatusText" => "up" } }
} else {
if [interface.ifAdminStatus] == 2 {
mutate { add_field => { "interface.ifAdminStatusText" => "down" } }
} else {
if [interface.ifAdminStatus] == 3 {
mutate { add_field => { "interface.ifAdminStatusText" => "testing" } }
}
}
}
}
output {
stdout { codec => rubydebug }
elasticsearch {
hosts => ["YOUR_ELASTICSEARCH_HOST:PORT"]
index => "network-devices-%{+xxxx.MM}"
user => "${ES_USER}"
password => "${ES_PASSWORD}"
ssl => true
ssl_certificate_verification => true
cacert => "/path/to/ca.crt"
keystore => "/path/to/keystore.p12"
keystore_password => "${KEYSTORE_PASSWORD}"
}
}
Results
Ping "Expression" Code:
kibana
| selectFilter
| essql
query="SELECT monitor.duration.us/1000 as latency_ms
FROM \"heartbeat-*\"
WHERE monitor.ip = 'SOME_IP_10.10.10.10'
AND \"@timestamp\" > NOW() - INTERVAL 5 MINUTE
"
| math "mean(latency_ms)"
| metric
metricFont={font align="center" color={switch case={case if={lt 100} then="green"} case={case if={all {gte 100} {lt 200}} then="yellow"} case={case if={gte 200} then="red"} default="black"} family="'Open Sans', Helvetica, Arial, sans-serif" italic=false size=48 underline=false weight="normal"} metricFormat="0.0" "ms"
| render css=""
Utilization "Expression" Code:
kibana
| selectFilter
| essql
query="SELECT *
FROM \"fortigate-*\"
WHERE interface.ifName = 'port1'
AND host.ip = 'SOME_IP_10.10.10.10'
AND \"@timestamp\" > NOW() - INTERVAL 5 MINUTES"
| math "mean(interface.inMbps)"
| metric
metricFont={font align="center" color={switch case={case if={lt 700} then="green"} case={case if={all {gte 700} {lt 800}} then="yellow"} case={case if={gte 900} then="red"} default="black"} family="'Open Sans', Helvetica, Arial, sans-serif" italic=false size=48 underline=false weight="normal"}
labelFont={font align="center" color="#000000" family="'Open Sans', Helvetica, Arial, sans-serif" italic=false size=18 underline=false weight="normal"} metricFormat="0" "LAN HQ IN"
| render
css=".canvasRenderEl {
position: relative;
border-radius: 125px;
padding: 25px;
background: transparent;
}
.canvasRenderEl::before {
content: \"\";
position: absolute;
inset: 0;
padding: 5px;
background: var(--dynamic-gradient, linear-gradient(145deg, #4facfe, #00f2fe, #a742ff));
border-radius: inherit;
-webkit-mask:
linear-gradient(#fff 0 0) content-box,
linear-gradient(#fff 0 0);
-webkit-mask-composite: xor;
mask-composite: exclude;
pointer-events: none;
}
"