We use Logstash extensively within my team as our ETL engine. Things started out simple enough with log file data, but we are at a point where our primary use case is centered around Business Intelligence. All our Incident, Change, Problem, Knowledge, etc... tickets are pushed through our pipelines where we perform extensive ETL to help speed things up on the reporting end. Because of the dynamic nature of the data, the ruby filter has become our friend, as developing standalone plugins has too much overhead for the team and it just works so dang well. Anyway, one or our use cases requires pulling working logs from our tickets that contain JSON objects. Our model has us denormalizing as much data into the parent (i.e. incident, change) record as possible so we needed to find a way to flatten those json objects into the parent document as well as have the ability to manipulate the data prior. There is a JSON filter, but it is very basic and doesn't handle nested objects very well. After researching solutions on the interwebs I was able to put together a nicely portable ruby filter and wanted to share. Be forewarned I am not a coder and I am sure improvements can be made but if it saves you some time then mission accomplished! Also, it isn't going to flatten your array objects but should be easy enough to update the code to accommodate the need.
FYI I am running Logstash 6.4
I shared the code on repl.it so you can test it out for your needs. Has an example JSON...
Link to repl.it code for testing
Below is the pipeline config example:
Those not familiar with using the new script file
ruby {
id => "<provide unique id>"
#This path will obviously change based on where you store the file
path => "/etc/logstash/ancillary/ruby-scripts/event-to-json.rb"
}
Below is the Ruby Filter code. Put this file someplace the logstash process can read:
# the value of `params` is the value of the hash passed to `script_params`
# in the logstash configuration
def register(params)
end
# the filter method receives an event and must return a list of events.
# Dropping an event means not including it in the return array,
# while creating new ones only requires you to add a new instance of
# LogStash::Event to the returned array
def filter(event)
require 'json'
#What field contains the JSON you are needing to flatten to Logstash event. Need to extract the proper JSON...
#Left my example: removing newlines and then pulling out the JSON from the notes field
vJSON = event.get('notes').gsub(/\n/,'').match(/{.*}/)[0]
#Transform that JSON into a hash
hash = JSON.parse(vJSON)
deep_traverse(hash) do |path,value|
#If Value is a hash we have a nested JSON object
if !value.is_a?(Hash)
#Need to update the entire key path so it's in a format that the logstash set method will accept ("[field]")
key_path = path.map!{|k| '['+k+']'}
#Join the key_path array to construct the field name used in set method ("[key1][key2][key3]")
#add add the field to logstash event.
event.set(key_path.join().to_s,value.to_s)
end
end
return[event]
end
#Code Credit: https://stackoverflow.com/questions/8748475/iterate-over-a-deeply-nested-level-of-hashes-in-ruby/21432969
def deep_traverse(hash,&block)
stack = hash.map{ |k,v| [ [k], v ] }
while not stack.empty?
key, value = stack.pop
yield(key, value)
if value.is_a? Hash
value.each do |k,v|
#Use this section to do what you need to the key or value
#I am simply using it to change an empty string value into a nil for this example
if v.is_a?(String) then
if v.empty? then v = nil end
end
stack.push [ key.dup << k, v ]
end
end
end
end
For those not using the external file approach see below. Might be some syntax issues as I just quickly converted it over:
ruby {
id => "<provide unique id>"
code = "
require 'json'
#Code Credit: https://stackoverflow.com/questions/8748475/iterate-over-a-deeply-nested-level-of-hashes-in-ruby/21432969
def deep_traverse(hash,&block)
stack = hash.map{ |k,v| [ [k], v ] }
while not stack.empty?
key, value = stack.pop
yield(key, value)
if value.is_a? Hash
value.each do |k,v|
#Use this section to do what you need to the key or value
#I am simply using it to change an empty string value into a nil for this example
if v.is_a?(String) then
if v.empty? then v = nil end
end
stack.push [ key.dup << k, v ]
end
end
end
end
#What field contains the JSON you are needing to flatten to Logstash event. Need to extract the proper JSON...
#Left my example: removing newlines and then pulling out the JSON from the notes field
vJSON = event.get('notes').gsub(/\n/,'').match(/{.*}/)[0]
#Transform that JSON into a hash
hash = JSON.parse(vJSON)
deep_traverse(hash) do |path,value|
#If Value is a hash we have a nested JSON object
if !value.is_a?(Hash)
#Need to update the entire key path so it's in a format that the Logstash set method will accept ("[field]")
key_path = path.map!{|k| '['+k+']'}
#Join the key_path array to construct the field name used in set method ("[key1][key2][key3]")
#add add the field to Logstash event.
event.set(key_path.join().to_s,value.to_s)
end
end
"
}
Happy Stashing!