Logstash SNMP Interface Monitoring Configuration with Bandwidth Utilization Calculation

Hi everyone,

I'd like to share my Logstash configuration for monitoring network interfaces via SNMP with bandwidth utilization calculation. This configuration collects interface statistics from network devices (tested with FortiGate), calculates bandwidth utilization in Mbps, and stores the data in Elasticsearch.

I've always wanted to create this config for the past two years but never had enough time to learn the Ruby syntax and debug all the issues. However, today, with some focus, I managed to create and test it within a few hours. The code was tested on FortiGate but should also work on Cisco ASA.

Key features:

  • SNMP v3 interface monitoring
  • Bandwidth calculation (Mbps and utilization percentage)
  • Interface status tracking
  • Custom interface naming
  • Historical data caching
  • Error handling and validation

The configuration includes:

  1. SNMP input with detailed interface metrics
  2. Data processing with Ruby filter
  3. Interface status text mapping
  4. Elasticsearch output

I've removed sensitive information and replaced it with placeholder values. You'll need to adjust the following:

  • SNMP credentials
  • Device IP addresses
  • Elasticsearch connection details
  • Interface mappings
  • Hostname settings

:warning: IMPORTANT: This configuration is optimized for 60-second polling intervals. If you want to use different intervals, you need to adjust the time delta validation in the Ruby filter (currently set to check if time_delta is between 55-65 seconds). The validation ensures accurate bandwidth calculations by verifying the time between measurements.

The complete code is attached below. Feel free to modify it for your needs!

input {
    snmp {
        interval => 60
        hosts => [
            { host => "udp:YOUR_DEVICE_IP/161" version => "3" retries => 5 }
        ]
        security_name => "${SNMP_USER}"
        auth_protocol => "sha"
        auth_pass => "${SNMP_AUTH_PASS}"
        priv_protocol => "aes128"
        priv_pass => "${SNMP_PRIV_PASS}"
        security_level => "authPriv"

        tables => [
            {
                name => "interfaces"
                columns => [
                    "1.3.6.1.2.1.2.2.1.1",    # ifIndex
                    "1.3.6.1.2.1.2.2.1.3",    # ifType
                    "1.3.6.1.2.1.2.2.1.4",    # ifMtu
                    "1.3.6.1.2.1.2.2.1.5",    # ifSpeed
                    "1.3.6.1.2.1.2.2.1.6",    # ifPhysAddress
                    "1.3.6.1.2.1.2.2.1.7",    # ifAdminStatus
                    "1.3.6.1.2.1.2.2.1.8",    # ifOperStatus
                    "1.3.6.1.2.1.2.2.1.9",    # ifLastChange
                    "1.3.6.1.2.1.31.1.1.1.6", # ifHCInOctets
                    "1.3.6.1.2.1.2.2.1.11",   # ifInUcastPkts
                    "1.3.6.1.2.1.2.2.1.12",   # ifInNUcastPkts
                    "1.3.6.1.2.1.2.2.1.13",   # ifInDiscards
                    "1.3.6.1.2.1.2.2.1.14",   # ifInErrors
                    "1.3.6.1.2.1.2.2.1.15",   # ifInUnknownProtos
                    "1.3.6.1.2.1.31.1.1.1.10",# ifHCOutOctets
                    "1.3.6.1.2.1.2.2.1.17",   # ifOutUcastPkts
                    "1.3.6.1.2.1.2.2.1.18",   # ifOutNUcastPkts
                    "1.3.6.1.2.1.2.2.1.19",   # ifOutDiscards
                    "1.3.6.1.2.1.2.2.1.20",   # ifOutErrors
                    "1.3.6.1.2.1.31.1.1.1.1"  # ifName
                ]
            }
        ]
        add_field => { "host" => "%{[@metadata][input][snmp][host][address]}" }
        tags => ["snmp", "interface"]
    }
}

filter {
    split {
        field => "interfaces"
    }

    mutate {
        remove_field => ["[interfaces]", "[@version]"]
        rename => {
            "[interfaces][iso.org.dod.internet.mgmt.mib-2.ifMIB.ifMIBObjects.ifXTable.ifXEntry.ifHCOutOctets]" => "interface.ifOutOctets"
            "[interfaces][iso.org.dod.internet.mgmt.mib-2.interfaces.ifTable.ifEntry.ifType]" => "interface.ifType"
            "[interfaces][iso.org.dod.internet.mgmt.mib-2.interfaces.ifTable.ifEntry.ifSpeed]" => "interface.ifSpeed"
            "[interfaces][iso.org.dod.internet.mgmt.mib-2.ifMIB.ifMIBObjects.ifXTable.ifXEntry.ifName]" => "interface.ifName"
            "[interfaces][iso.org.dod.internet.mgmt.mib-2.interfaces.ifTable.ifEntry.ifOutUcastPkts]" => "interface.ifOutUcastPkts"
            "[interfaces][iso.org.dod.internet.mgmt.mib-2.interfaces.ifTable.ifEntry.ifInNUcastPkts]" => "interface.ifInNUcastPkts"
            "[interfaces][iso.org.dod.internet.mgmt.mib-2.interfaces.ifTable.ifEntry.ifInUcastPkts]" => "interface.ifInUcastPkts"
            "[interfaces][iso.org.dod.internet.mgmt.mib-2.interfaces.ifTable.ifEntry.ifOutNUcastPkts]" => "interface.ifOutNUcastPkts"
            "[interfaces][iso.org.dod.internet.mgmt.mib-2.interfaces.ifTable.ifEntry.ifInDiscards]" => "interface.ifInDiscards"
            "[interfaces][iso.org.dod.internet.mgmt.mib-2.interfaces.ifTable.ifEntry.ifOutErrors]" => "interface.ifOutErrors"
            "[interfaces][iso.org.dod.internet.mgmt.mib-2.interfaces.ifTable.ifEntry.ifOperStatus]" => "interface.ifOperStatus"
            "[interfaces][iso.org.dod.internet.mgmt.mib-2.interfaces.ifTable.ifEntry.ifIndex]" => "interface.ifIndex"
            "[interfaces][iso.org.dod.internet.mgmt.mib-2.interfaces.ifTable.ifEntry.ifInErrors]" => "interface.ifInErrors"
            "[interfaces][iso.org.dod.internet.mgmt.mib-2.interfaces.ifTable.ifEntry.ifOutDiscards]" => "interface.ifOutDiscards"
            "[interfaces][iso.org.dod.internet.mgmt.mib-2.ifMIB.ifMIBObjects.ifXTable.ifXEntry.ifHCInOctets]" => "interface.ifInOctets"
            "[interfaces][iso.org.dod.internet.mgmt.mib-2.interfaces.ifTable.ifEntry.ifPhysAddress]" => "interface.ifPhysAddress"
            "[interfaces][iso.org.dod.internet.mgmt.mib-2.interfaces.ifTable.ifEntry.ifLastChange]" => "interface.ifLastChange"
            "[interfaces][iso.org.dod.internet.mgmt.mib-2.interfaces.ifTable.ifEntry.ifMtu]" => "interface.ifMtu"
            "[interfaces][iso.org.dod.internet.mgmt.mib-2.interfaces.ifTable.ifEntry.ifAdminStatus]" => "interface.ifAdminStatus"
        }
        rename => { "host" => "host.ip" }
        add_field => { "host.hostname" => "YOUR_DEVICE_NAME" }
    }

    ruby {
        code => '
            require "json"
            require "fileutils"

            # Directory for storing interface statistics cache
            cache_dir = "/etc/logstash/interfaces_cache/interface_stats"
            FileUtils.mkdir_p(cache_dir)

            # Define interface name mappings
            interface_mappings = {
                "port1" => "WAN",
                "port2" => "LAN",
                "port3" => "DMZ"
            }

            # Get custom name for interface
            def get_custom_name(if_name, mappings)
                mappings[if_name]
            end

            # Calculate bandwidth utilization
            def calculate_utilization(current, previous, time_delta, speed)
                return 0.0 if speed.to_i == 0 || time_delta == 0
                delta = (current.to_i - previous.to_i).abs
                bits_per_second = (delta * 8.0) / time_delta
                mbps = (bits_per_second / 1_000_000.0).round(2)  # Convert to Mbps
                [mbps, ((bits_per_second / speed.to_f) * 100.0).round(2)]
            end

            begin
                host_ip = event.get("host.ip").to_s.gsub(".", "_")
                if_index = event.get("interface.ifIndex").to_s
                interface_name = event.get("interface.ifName").to_s

                custom_name = get_custom_name(interface_name, interface_mappings)
                if custom_name
                    event.set("interface.ifCustomName", custom_name)
                    event.set("interface.ifCustomNameIn", "IN #{custom_name}...")
                    event.set("interface.ifCustomNameOut", "OUT #{custom_name}...")
                end

                cache_file = File.join(cache_dir, "#{host_ip}_#{if_index}.json")

                current_time = Time.now.to_i
                current_in = event.get("interface.ifInOctets").to_i
                current_out = event.get("interface.ifOutOctets").to_i
                speed = event.get("interface.ifSpeed").to_i

                if File.exist?(cache_file)
                    previous = JSON.parse(File.read(cache_file))
                    time_delta = current_time - previous["timestamp"].to_i

                    if time_delta >= 55 && time_delta <= 65 && speed > 0
                        in_mbps, in_utilization = calculate_utilization(
                            current_in,
                            previous["in_octets"],
                            time_delta,
                            speed
                        )
                        out_mbps, out_utilization = calculate_utilization(
                            current_out,
                            previous["out_octets"],
                            time_delta,
                            speed
                        )

                        event.set("interface.inUtilization", in_utilization)
                        event.set("interface.outUtilization", out_utilization)
                        event.set("interface.inMbps", in_mbps)
                        event.set("interface.outMbps", out_mbps)
                        event.set("interface.timeDelta", time_delta)
                        event.set("interface.measurementValid", true)
                        event.set("interface.lastMeasurementTime", previous["timestamp"])
                    else
                        event.set("interface.measurementValid", false)
                        event.set("interface.timeDelta", time_delta)
                        event.set("interface.lastMeasurementTime", previous["timestamp"])
                        event.set("interface.invalidReason", speed > 0 ? "Invalid time delta" : "Speed is 0")
                        event.set("interface.inMbps", 0.0)
                        event.set("interface.outMbps", 0.0)
                    end
                else
                    event.set("interface.measurementValid", false)
                    event.set("interface.timeDelta", 0)
                    event.set("interface.invalidReason", "No previous measurement")
                    event.set("interface.inMbps", 0.0)
                    event.set("interface.outMbps", 0.0)
                end

                current_data = {
                    "timestamp" => current_time,
                    "measurement_time" => Time.now.utc.strftime("%Y-%m-%d %H:%M:%S UTC"),
                    "in_octets" => current_in,
                    "out_octets" => current_out,
                    "speed" => speed,
                    "interface_name" => interface_name,
                    "custom_name" => custom_name,
                    "host_ip" => event.get("host.ip"),
                    "if_index" => if_index
                }

                File.write(cache_file, JSON.generate(current_data))
            rescue => e
                event.set("ruby_error", e.message)
                event.set("interface.measurementValid", false)
                event.set("interface.invalidReason", "Error: #{e.message}")
                event.set("interface.inMbps", 0.0)
                event.set("interface.outMbps", 0.0)
            end
        '
    }

    # Map interface operational status to text
    if [interface.ifOperStatus] == 1 {
        mutate { add_field => { "interface.ifOperStatusText" => "up" } }
    } else {
        if [interface.ifOperStatus] == 2 {
            mutate { add_field => { "interface.ifOperStatusText" => "down" } }
        } else {
            if [interface.ifOperStatus] == 3 {
                mutate { add_field => { "interface.ifOperStatusText" => "testing" } }
            } else {
                if [interface.ifOperStatus] == 4 {
                    mutate { add_field => { "interface.ifOperStatusText" => "unknown" } }
                } else {
                    if [interface.ifOperStatus] == 5 {
                        mutate { add_field => { "interface.ifOperStatusText" => "notPresent" } }
                    } else {
                        if [interface.ifOperStatus] == 6 {
                            mutate { add_field => { "interface.ifOperStatusText" => "lowerLayerDown" } }
                        }
                    }
                }
            }
        }
    }

    # Map interface administrative status to text
    if [interface.ifAdminStatus] == 1 {
        mutate { add_field => { "interface.ifAdminStatusText" => "up" } }
    } else {
        if [interface.ifAdminStatus] == 2 {
            mutate { add_field => { "interface.ifAdminStatusText" => "down" } }
        } else {
            if [interface.ifAdminStatus] == 3 {
                mutate { add_field => { "interface.ifAdminStatusText" => "testing" } }
            }
        }
    }
}

output {
    stdout { codec => rubydebug }
    
    elasticsearch {
        hosts => ["YOUR_ELASTICSEARCH_HOST:PORT"]
        index => "network-devices-%{+xxxx.MM}"
        user => "${ES_USER}"
        password => "${ES_PASSWORD}"
        ssl => true
        ssl_certificate_verification => true
        cacert => "/path/to/ca.crt"
        keystore => "/path/to/keystore.p12"
        keystore_password => "${KEYSTORE_PASSWORD}"
    }
}

Results

Ping "Expression" Code:

kibana
| selectFilter
| essql 
  query="SELECT monitor.duration.us/1000 as latency_ms
FROM \"heartbeat-*\"
WHERE monitor.ip = 'SOME_IP_10.10.10.10'
    AND \"@timestamp\" > NOW() - INTERVAL 5 MINUTE
"
| math "mean(latency_ms)"
| metric 
  metricFont={font align="center" color={switch case={case if={lt 100} then="green"} case={case if={all {gte 100} {lt 200}} then="yellow"} case={case if={gte 200} then="red"} default="black"} family="'Open Sans', Helvetica, Arial, sans-serif" italic=false size=48 underline=false weight="normal"} metricFormat="0.0" "ms"
| render css=""

Utilization "Expression" Code:

kibana
| selectFilter
| essql 
  query="SELECT *
FROM \"fortigate-*\"
WHERE interface.ifName = 'port1'
    AND host.ip = 'SOME_IP_10.10.10.10'
    AND \"@timestamp\" > NOW() - INTERVAL 5 MINUTES"
| math "mean(interface.inMbps)"
| metric 
  metricFont={font align="center" color={switch case={case if={lt 700} then="green"} case={case if={all {gte 700} {lt 800}} then="yellow"} case={case if={gte 900} then="red"} default="black"} family="'Open Sans', Helvetica, Arial, sans-serif" italic=false size=48 underline=false weight="normal"} 
  labelFont={font align="center" color="#000000" family="'Open Sans', Helvetica, Arial, sans-serif" italic=false size=18 underline=false weight="normal"} metricFormat="0" "LAN HQ IN"
| render 
  css=".canvasRenderEl {
    position: relative;
    border-radius: 125px;
    padding: 25px;
    background: transparent;
}

.canvasRenderEl::before {
    content: \"\";
    position: absolute;
    inset: 0;
    padding: 5px;
    background: var(--dynamic-gradient, linear-gradient(145deg, #4facfe, #00f2fe, #a742ff));
    border-radius: inherit;
    -webkit-mask: 
        linear-gradient(#fff 0 0) content-box,
        linear-gradient(#fff 0 0);
    -webkit-mask-composite: xor;
    mask-composite: exclude;
    pointer-events: none;
}
"
2 Likes