Gradually introducing a portion of traffic to a new logstash configuration

Hi all, my colleague and I have been working on using the memcached filter plugin. We've got memcached fed with a inventory data about our various network equipment.

The goal: I want to label all syslog traffic from our networking devices with information about what that device is, such as:

  • is it a distribution switch; access switch; wireless controller; firewall, etc.
  • which building and network cabinet is it on, etc.
    ... and more besides.

Pretty useful if you want to aggregate a huge volume of network logs.

To this end I've introduced the following configuration:

filter {
  if ...is-syslog... and and [syslog-host-ip] == "... single IP ..." {
    memcached {
      id => "networking.memcached.32"
      hosts => ["127.0.0.1"]
      namespace => "network_devices"
      get => {
        "%{syslog-host-ip}" => "[@metadata][network_devices_temp]"
      }
    }
    json {
      id => "networking.json.41"
      source => "[@metadata][network_devices_temp]"
      target => "source_device"
    }
  }
}

This works (now), but I had to constrain it to just a single IP as I noticed it was performing worse than that my elasticsearch output (which is the bottleneck generally).

# curl -s "http://127.0.0.1:9601/_node/stats/pipelines/main" | jq '.pipelines.main.plugins.filters[] | select(.id == "networking.memcached.32")'
{
  "id": "networking.memcached.32",
  "name": "memcached",
  "events": {
    "in": 15143,
    "duration_in_millis": 2230,
    "out": 15143
  }
}

(the 32 in the ID is based on the line-number; just as a way of making the IDs unique without thinking too hard about how to name things)

Now that I know it is functionally working, I want to start introducing it to more traffic so I can observe impact on the pipeline.

I would like to able be process say ever Nth record, or process some percentage of the traffic, or possibly process say every 10 sequential records for every 1000 (or have a duty cycle of 5 seconds per minute).

I suppose I could use a ruby plugin to do some percentage, but I'm wondering if there is something more commonly used, or perhaps something like awk's NR variable? I know the 'drop' module has a percentage attribute; but I'm not interested in dropping them :slight_smile:

You can use a throttle filter to tag a subset of the events and then use a conditional to test whether the tag was added.

Thanks. Here's my worked solution, should others with to play.

# To setup:
# 
# For TCP memcached on 127.0.0.1:11211
#
# memcached -d -l 127.0.0.1
# for i in `seq 1 10`
# do
#   echo -en 'set perf:10.0.0.'$i' 0 10000 100\r\n1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890\r\n' | nc 127.0.0.1 11211
# done

input {
  generator {
    lines => [
      "10.0.0.1",
      "10.0.0.2",
      "10.0.0.3",
      "10.0.0.4",
      "10.0.0.5",
      "10.0.0.6",
      "10.0.0.7",
      "10.0.0.8",
      "10.0.0.9",
      "10.0.0.10"
    ]
  }
}

filter {

  throttle {
    before_count => 1000
    period => 10
    max_age => 20
    key => "any"
    add_tag => "pass_to_memcache"
  }

  if "pass_to_memcache" in [tags] {
    memcached {
      hosts => ["127.0.0.1:11211"]
      namespace => "perf"
      id => "memcached-get"
      get => {
        "%{message}" => "ipinfo"
      }
    }
  }

  # drop {}
}

output {
  stdout { codec => "json_lines" }
}

# In another window...
#
# curl -s 127.0.0.1:9602/_node/stats/pipelines/main | jq '.pipelines.main.plugins.filters[] | select(.id == "memcached-get") | (.events.duration_in_millis / .events.out)'
#

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.