Logstash, nested json and AWS ELB logs


(alexander) #1

I've been using Logstash to parse my AWS ELB logs for a little while, but need to change my parsing since it's causing the ElasticSearch mapping to grow.

What I have currently is this:

if [type] == "elb" {
  grok {
    match => [ "message", "%{TIMESTAMP_ISO8601:timestamp} %{NOTSPACE:elb} %{IP:clientip}:%{INT:clientport:int} (?:(%{IP:backendip}:?:%{INT:backendport:int})|-) %{NUMBER:request_processing_time:float} %{NUMBER:backend_processing_time:float} %{NUMBER:response_processing_time:float} %{INT:response:int} %{INT:backend_response:int} %{INT:received_bytes:int} %{INT:bytes:int} \"(?:%{WORD:verb} (?<request>%{URIPROTO:proto}://(?:%{USER}(?::[^@]*)?@)?(?<urihost>((?:%{IP}|\b(?:[0-9A-Za-z][0-9A-Za-z\-_]{0,62})(?:\.(?:[0-9A-Za-z][0-9A-Za-z\-_]{0,62}))*(\.?|\b))(?::%{POSINT:port})?))(?:%{URIPATH:path}(?<params>\?[A-Za-z0-9$.+!*'|(){},~@#%&/=:;_?\-\[\]<>\^\`]*)?)?)(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})\" \"%{DATA:useragent}\" %{NOTSPACE:ssl_cipher} %{NOTSPACE:ssl_protocol}"]  }
  date {
    match => [ "timestamp", "ISO8601" ]
  }
  
  if [params] {
    mutate {
      rename => { "params" => "params[request]" }
    }
    urldecode {
      field => "params[request]"
    }
    kv {
      source => "params[request]"
      field_split => "?&"
      target => "params"
    }
    mutate {
      split => { "params[document_ids]" => "," }
    }
  }
}

I'm putting this here just so you can understand what I've been doing. The problem comes from the usage of the kv filter: it creates an array of items based upon the request parameters that are being parsed from my log lines, that continues to grow, since it's just converting the input into key-vals without any filtering.

After doing some reading around, I want to try out nested fields, specifically for the parameters section. So the JSON I've got currently looks like this:

"params": [
  "request": "?document_ids=1,2,3,4,5&user_id=1234567890",
  "document_ids": ["1", "2", "3", "4", "5"],
  "user_id": "1234567890"
}

As I understand it, with nested fields this would rather be in this form:

"params": [
  {
    "key": "document_ids", 
    "value": ["1", "2", "3", "4", "5"]
  },
  {
    "key": "user_id",
    "value": "1234567890"
  }
]

Unfortunately I can't work out a way to do this. I tried using kv to extract them as key-vals, and try to feed that into a ruby filter but it errors out, and I wasn't sure if it was the most efficient way to be creating these fields.

Does anyone have any thoughts on how I should be going about this?


(alexander) #2

Ok I kind of solved my own question, but I'd still appreciate feedback if anyone has any. What I ended up with was this:

if [type] == "elb" {
  grok {
    match => [ "message", "%{TIMESTAMP_ISO8601:timestamp} %{NOTSPACE:elb} %{IP:clientip}:%{INT:clientport:int} (?:(%{IP:backendip}:?:%{INT:backendport:int})|-) %{NUMBER:request_processing_time:float} %{NUMBER:backend_processing_time:float} %{NUMBER:response_processing_time:float} %{INT:response:int} %{INT:backend_response:int} %{INT:received_bytes:int} %{INT:bytes:int} \"(?:%{WORD:verb} (?<request>%{URIPROTO:proto}://(?:%{USER}(?::[^@]*)?@)?(?<urihost>((?:%{IP}|\b(?:[0-9A-Za-z][0-9A-Za-z\-_]{0,62})(?:\.(?:[0-9A-Za-z][0-9A-Za-z\-_]{0,62}))*(\.?|\b))(?::%{POSINT:port})?))(?:%{URIPATH:path}(?<params>\?[A-Za-z0-9$.+!*'|(){},~@#%&/=:;_?\-\[\]<>\^\`]*)?)?)(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})\" \"%{DATA:useragent}\" %{NOTSPACE:ssl_cipher} %{NOTSPACE:ssl_protocol}"]  }
  date {
    match => [ "timestamp", "ISO8601" ]
  }
  
  if [params] {
    mutate {
      rename => { "params" => "params[request]" }
    }
    urldecode {
      field => "params[request]"
    }
    kv {
      source => "params[request]"
      field_split => "?&"
      target => "params"
    }
    ruby {
      init => "require 'json'"
      code => "
        count = 0
        event['params'].to_hash.each {|k,v|
          if k == 'request' then
            next
          end
          path = '[arguments][%d]' % count
          event[path+'[key]'] = k
          event[path+'[value]'] = v
          count = count.next
        }
        event['[arguments]'] = event['[arguments]'].to_hash.values
      "
      remove_field => [ "params" ]
    }
  }
}

It feels a bit ugly, since I could only get it to work by making a Hash object, which is then converted into an Array, but I've never written Ruby before so as a first pass, it's fine.


(alexander) #3

For the record here's what I ended up with:

ruby {
  code => "
    arguments = Array.new
    event['params'].to_hash.each {|k,v|
      if k == 'request' then
        next
      end
      arguments << { 'key' => k, 'value' => v }
    }
    unless arguments.empty?
      event['[arguments]'] = arguments
    end
  "
  remove_field => [ "params" ]
}

(system) #4