Ruby filter for parsing deeply nested JSON in pipeline


#1

We use Logstash extensively within my team as our ETL engine. Things started out simple enough with log file data, but we are at a point where our primary use case is centered around Business Intelligence. All our Incident, Change, Problem, Knowledge, etc... tickets are pushed through our pipelines where we perform extensive ETL to help speed things up on the reporting end. Because of the dynamic nature of the data, the ruby filter has become our friend, as developing standalone plugins has too much overhead for the team and it just works so dang well. Anyway, one or our use cases requires pulling working logs from our tickets that contain JSON objects. Our model has us denormalizing as much data into the parent (i.e. incident, change) record as possible so we needed to find a way to flatten those json objects into the parent document as well as have the ability to manipulate the data prior. There is a JSON filter, but it is very basic and doesn't handle nested objects very well. After researching solutions on the interwebs I was able to put together a nicely portable ruby filter and wanted to share. Be forewarned I am not a coder and I am sure improvements can be made but if it saves you some time then mission accomplished! Also, it isn't going to flatten your array objects but should be easy enough to update the code to accommodate the need.

FYI I am running Logstash 6.4

I shared the code on repl.it so you can test it out for your needs. Has an example JSON...
Link to repl.it code for testing

Below is the pipeline config example:
Those not familiar with using the new script file

    ruby { 
      id => "<provide unique id>"
      #This path will obviously change based on where you store the file
      path => "/etc/logstash/ancillary/ruby-scripts/event-to-json.rb"
    }

Below is the Ruby Filter code. Put this file someplace the logstash process can read:

# the value of `params` is the value of the hash passed to `script_params`
# in the logstash configuration
def register(params)

end

# the filter method receives an event and must return a list of events.
# Dropping an event means not including it in the return array,
# while creating new ones only requires you to add a new instance of
# LogStash::Event to the returned array
def filter(event)
  require 'json'
  
  #What field contains the JSON you are needing to flatten to Logstash event. Need to extract the proper JSON...
  #Left my example:  removing newlines and then pulling out the JSON from the notes field
  vJSON = event.get('notes').gsub(/\n/,'').match(/{.*}/)[0]

  #Transform that JSON into a hash
  hash = JSON.parse(vJSON)

  deep_traverse(hash) do |path,value|
    #If Value is a hash we have a nested JSON object
    if !value.is_a?(Hash)
	  #Need to update the entire key path so it's in a format that the logstash set method will accept ("[field]")
      key_path = path.map!{|k| '['+k+']'}
	  #Join the key_path array to construct the field name used in set method ("[key1][key2][key3]")
	  #add add the field to logstash event.
      event.set(key_path.join().to_s,value.to_s)
    end
  end
  return[event]
end

#Code Credit: https://stackoverflow.com/questions/8748475/iterate-over-a-deeply-nested-level-of-hashes-in-ruby/21432969
def deep_traverse(hash,&block)
  stack = hash.map{ |k,v| [ [k], v ] }
  while not stack.empty?
    key, value = stack.pop
    yield(key, value)
    if value.is_a? Hash
      value.each do |k,v|
        #Use this section to do what you need to the key or value
		#I am simply using it to change an empty string value into a nil for this example
        if v.is_a?(String) then
          if v.empty? then v = nil end
        end
        stack.push [ key.dup << k, v ]
      end
    end
  end
end

For those not using the external file approach see below. Might be some syntax issues as I just quickly converted it over:

ruby { 
    id => "<provide unique id>"
    code = "	   
      require 'json'
	  
	  #Code Credit: https://stackoverflow.com/questions/8748475/iterate-over-a-deeply-nested-level-of-hashes-in-ruby/21432969
      def deep_traverse(hash,&block)
        stack = hash.map{ |k,v| [ [k], v ] }
        while not stack.empty?
          key, value = stack.pop
          yield(key, value)
          if value.is_a? Hash
            value.each do |k,v|
              #Use this section to do what you need to the key or value
		      #I am simply using it to change an empty string value into a nil for this example
              if v.is_a?(String) then
                if v.empty? then v = nil end
              end
              stack.push [ key.dup << k, v ]
            end
          end
        end
      end
  
      #What field contains the JSON you are needing to flatten to Logstash event. Need to extract the proper JSON...
      #Left my example:  removing newlines and then pulling out the JSON from the notes field
      vJSON = event.get('notes').gsub(/\n/,'').match(/{.*}/)[0]

      #Transform that JSON into a hash
      hash = JSON.parse(vJSON)

      deep_traverse(hash) do |path,value|
        #If Value is a hash we have a nested JSON object
        if !value.is_a?(Hash)
	      #Need to update the entire key path so it's in a format that the Logstash set method will accept ("[field]")
          key_path = path.map!{|k| '['+k+']'}
	      #Join the key_path array to construct the field name used in set method ("[key1][key2][key3]")
	      #add add the field to Logstash event.
          event.set(key_path.join().to_s,value.to_s)
        end
      end
    "
  }

Happy Stashing!


#2

OK, so looks like the need to flatten an array popped up. This code will not work if your JSON object is an array at the top level. I updated my repl.it example as well....

So this is what I ended up adding to the code:

if v.is_a?(Array) then
  item = []
  v.each_with_index{ |vx,idx| idx == 0 ? item.push("\"#{idx}\"=>#{vx}") : item.push(",\"#{idx}\"=>#{vx}")}
  v=eval("{#{item.join()}}")
end

This code sits inside the value.each do |k,v| loop:

if value.is_a? Hash
  value.each do |k,v|
    if v.is_a? Array then
      item = []
	  
	  #This will iterate through the array leveraging the index to structure a new hash value to keep the elements unique.
	  #Note, if it's working on the first element I don't add a comma at the beginning of the push
      v.each_with_index{ |vx,idx| idx == 0 ? item.push("\"#{idx}\"=>#{vx}") : item.push(",\"#{idx}\"=>#{vx}")}
	  
	  #Join each array element and eval to a has object
      v=eval("{#{item.join()}}")
    end
	
    #Use this section to do what you need to the key or value
    #I am simply using it to change an empty string value into a nil for this example
    if v.is_a?(String) then
      if v.empty? then v = nil end
    end
    stack.push [ key.dup << k, v ]
  end
end

So something like this:

{
  "build_validation":{
    "requests":[
      {"request_dt_tm":"2018-11-20T11:34:27","request_type":""},
	  {"request_dt_tm":"2018-11-20T15:10:12","request_type":"CERT"},
	  {"request_dt_tm":"2018-11-27T17:14:39","request_type":"PROD"},
	  {"request_dt_tm":"2018-11-28T14:43:56","request_type":"PROD"}],
    "responses":[
	  {"response":"FAIL","response_dt_tm":"2018-11-20T12:03:41","response_type":"CERT","response_id":"2702567.1","response_src":"CLIENT"},
	  {"response":"SUCCESS","response_dt_tm":"2018-11-20T15:45:00","response_type":"CERT","response_id":"2703856.1","response_src":"CLIENT"},
	  {"response":"FAIL","response_dt_tm":"2018-11-28T09:36:21","response_type":"PROD","response_id":"2720707.1","response_src":"CLIENT"},
	  {"response":"SUCCESS","response_dt_tm":"2018-11-28T14:43:58","response_type":"PROD","response_src":"ASSOCIATE"}
    ]
  }
}

Becomes this:

[build_validation][responses][3][response_src] => ASSOCIATE
[build_validation][responses][3][response_type] => PROD
[build_validation][responses][3][response_dt_tm] => 2018-11-28T14:43:58
[build_validation][responses][3][response] => SUCCESS
[build_validation][responses][2][response_src] => CLIENT
[build_validation][responses][2][response_id] => 2720707.1
[build_validation][responses][2][response_type] => PROD
[build_validation][responses][2][response_dt_tm] => 2018-11-28T09:36:21
[build_validation][responses][2][response] => FAIL
[build_validation][responses][1][response_src] => CLIENT
[build_validation][responses][1][response_id] => 2703856.1
[build_validation][responses][1][response_type] => CERT
[build_validation][responses][1][response_dt_tm] => 2018-11-20T15:45:00
[build_validation][responses][1][response] => SUCCESS
[build_validation][responses][0][response_src] => CLIENT
[build_validation][responses][0][response_id] => 2702567.1
[build_validation][responses][0][response_type] => CERT
[build_validation][responses][0][response_dt_tm] => 2018-11-20T12:03:41
[build_validation][responses][0][response] => FAIL
[build_validation][requests][3][request_type] => PROD
[build_validation][requests][3][request_dt_tm] => 2018-11-28T14:43:56
[build_validation][requests][2][request_type] => PROD
[build_validation][requests][2][request_dt_tm] => 2018-11-27T17:14:39
[build_validation][requests][1][request_type] => CERT
[build_validation][requests][1][request_dt_tm] => 2018-11-20T15:10:12
[build_validation][requests][0][request_type] => 
[build_validation][requests][0][request_dt_tm] => 2018-11-20T11:34:27

#3

With our needs met I decided to keep doing some development on this to hone my Ruby skills. I don't have the full Ruby filter file ready for a Logstash pipeline just yet but I do have the repl.it code updated if you would like to review and test your JSON object. I'll post back the pipeline ready ruby filter file soon....

The updates:
1.) now handles most JSON objects and their arrays (Atleast the ones I tested).
2.) added the ability to turn array parsing on and off
3.) defining a root field if wanted.
4.) Added Exception handling

Please note that Logstash has a few ways to deal with JSON data (i.e. codecs, json filter). Those should cover the vast majority of use cases and there is no need to go this complex.

Checkout the code...to many characters to post


#4

As promised:

Latest ruby filter code: gist link
Example pipeline usage:

ruby {
path => "/path-to-file/json-to-event.rb"
script_params => {
"json_field" => "notes" <--Specify the field you want to extract json from (default: message)
"array" => true <--Do you want to flatten arrays within the json (default: false)
"target" => "parent" <--Specify root level name if wanted (default: root of document)
"tag_match_failure" => true <--Do you want to tag event when json regex match fails (defaults: false)
}
}


Logstash - how to flat json array with ruby filter?
#5

Just wanted to stop by and say thank you, I have a feeling I might have use for this later on!


#6

Good to hear! I hope you get some use out of it.


Help with embedded JSON flatten - Ruby Filter
(system) closed #7

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.