Logstash - how to flat json array with ruby filter?

I have a log file with json format, and there are json arrays in it. for example, a piece of array log is like below. I am using json+ruby fitler to make the array element parsed flatten. but my filter looks like doesn't work, can the expert please take a look ?

1. array

body {
        "Action": "TalentSearch",
        "name": "Shanghai",
        "ratingCriteria": [{
  	"id": "sysOverallPotential",
  	"type": "7",
  	"name": "Potential - 3x3 Rating",
  	"item": "null",
  	"scaleId": "Potential",
  	"scaleMin": "1",
  	"scaleMax": "3",
  	"criterias": [{
  		"id": "tsv2RatingValidFrom",
  		"type": "date",
  		"value": ["2018 - 12 - 04", "2018 - 12 - 31"]
  	}, {
  		"id": "tsv2RatingValidTo",
  		"type": "date",
  		"value": ["2018 - 12 - 31", "2018 - 12 - 31"]
  	}, {
  		"id": "tsv2RatingFromValue",
  		"type": "prepopulate",
  		"value": ["1.0"]
  	}, {
  		"id": "tsv2RatingEndValue",
  		"type": "prepopulate",
  		"value": ["2.0"]
  	}]
  }]
      }

2. my json+ruby filter(because field name is dynamic, so use ruby to iterate them)

 input {
    file{
            path => "C:/elkstack/elasticsearch-6.5.1/logs/app.log"		
            start_position => "beginning"
  		sincedb_path => "null"
  		codec => "json"
         }	
        }
         filter {
   ruby {
       code => "event.to_hash.each {|k,v|if v.is_a?(Array)
										 v.each do |element|if element.is_a?(Hash)
											                  element.each {|k,v| event.set(k, v.split(','))}
															else
															  event.set(k,v)
										                    end
										 end
										else
										  event.set(k,v)
							   end}"
   }

}

4. actual parsed result

{
"id": "sysOverallPotential",
"item": "null",
"scaleId": "Potential",
"type": "7",
"scaleMax": "3",
"criterias": [
{
"id": "tsv2RatingValidFrom",
"type": "date",
"value": [
"2018 - 12 - 04",
"2018 - 12 - 31"
]
},
{
"id": "tsv2RatingValidTo",
"type": "date",
"value": [
"2018 - 12 - 31",
"2018 - 12 - 31"
]
},
{
"id": "tsv2RatingFromValue",
"type": "prepopulate",
"value": [
"1.0"
]
},
{
"id": "tsv2RatingEndValue",
"type": "prepopulate",
"value": [
"2.0"
]
}
],
"name": "Potential - 3x3 Rating",
"scaleMin": "1"
}

5. my expected parsed result displayed in kibana is flat enough

          "body.ratingCriteria.id": "sysOverallPotential"
          "body.ratingCriteria.type": "7"
          ...
          "body.ratingCriteria.criterias.id": "tsv2RatingValidFrom"
          "body.ratingCriteria.criterias.type": "date"
          ...

So it looks like you want to flatten some fields of your event.

A Logstash event can have scalar, array or hash values in any root level field.

The body field's value is a hash. You want to be flattening that, I guess.

Question: As shown in your desired "shape" the flattened key is a dotted accumulation of the parent keys, correct?

Hi guyboertje,

Thanks for your attention to this issue.

1. The body field's value is a hash. You want to be flattening that, I guess.
Yes. Guess is right.

2. Question : As shown in your desired "shape" the flattened key is a dotted accumulation of the parent keys, correct?
Yes.

I can't figure out what's the issue with my json+ruby filter? I use json to do first parse, and use ruby fillter to continue parse the array scenarions and want them to be flatterning.

You can take a look at my ruby code, perhaps it will help you understand yours. Or you can just use the filter.

Without knowing what your wanting in the end result, you could try

filter {
   split { 
       field => "criterias"
   }
}

it may give you 4 events from the example provided

Hi bloke - Thank you, but the field name is dynamic, not static.

@cheriemilk

Ahh I understand better now.

One solution is to take advantage of field interpolation in the mutate/rename then use the split filter.
In this test config below, I used the json filter - but you can use the file input and json codec as before. This test config only works for two levels of split, at ratingCriteria and criterias.

input {
  generator {
    message => '{"body":{"Action":"TalentSearch","name":"Shanghai","ratingCriteria":[{"id":"sysOverallPotential","type":"7","name":"Potential - 3x3 Rating","item":"null","scaleId":"Potential","scaleMin":"1","scaleMax":"3","criterias":[{"id":"tsv2RatingValidFrom","type":"date","value":["2018 - 12 - 04","2018 - 12 - 31"]},{"id":"tsv2RatingValidTo","type":"date","value":["2018 - 12 - 31","2018 - 12 - 31"]},{"id":"tsv2RatingFromValue","type":"prepopulate","value":["1.0"]},{"id":"tsv2RatingEndValue","type":"prepopulate","value":["2.0"]}]}]}}'
    count => 1
  }
}

filter {
  json {
    source => "message"
  }
  ruby {
    code => '
      event.to_hash.each do |key, value|
        if value.is_a?(Hash)
          value.each do |field, child|
            if child.is_a?(Array)
              event.set("renameable_field", "[#{key}][#{field}]")
              break
            end
          end
        end
      end
    '
  }
  if [renameable_field] {
    mutate {
      rename => {"[%{renameable_field}]" => "field_that_needs_splitting"}
    }
    split {
      field => "[field_that_needs_splitting]"
    }
    mutate {
      rename => {"field_that_needs_splitting" => "[%{renameable_field}]"}
    }
    ruby {
      code => '
        renameable_field = event.remove("renameable_field")
        inner = event.get(renameable_field)
        if inner.is_a?(Hash)
          inner.each do |field, child|
            if child.is_a?(Array)
              event.set("renameable_field", renameable_field + "[#{field}]")
              break
            end
          end
        end
      '
    }
    if [renameable_field] {
      mutate {
        rename => {"[%{renameable_field}]" => "field_that_needs_splitting"}
      }
      split {
        field => "[field_that_needs_splitting]"
      }
      mutate {
        rename => {"field_that_needs_splitting" => "[%{renameable_field}]"}
      }
    }
  }
}

output {
  stdout { codec => rubydebug }
}
2 Likes

Thank you.

I am trying to read and understand it as the configuration is a bit complex than what I can understand. Could you please help explain a bit?

Thanks,
Cherie

I'll try.

Generally the problem is described as:
I received a single document that contains multiple "metrics" with some higher level metadata that is relevant to each inner metric. How do I create individual metric documents that also hold the relevant metadata.

In general, we need to use the split filter to create these individual documents. However the split filter has a "hard coded" field to split on but yours is unknown when editing the config or when LS starts.

To solve this we need to use the ruby filter to find this dynamic field, I picked the first field in the event that is an array of inner documents but some other criteria can possibly be used. Once the field is found we need to make a "memo" of the field name event.set("renameable_field", "[#{key}][#{field}]").

We then use the mutate/rename function to rename the field to the one we hard coded in the split filter settings rename => {"[%{renameable_field}]" => "field_that_needs_splitting"} interpolation is used on the LHS.

We then use the split filter to create the multiple net new events.

We then use the mutate/rename function again on each new event to rename the hard coded field back to the one we found rename => {"field_that_needs_splitting" => "[%{renameable_field}]"}.

However, because your individual metrics are two levels deep, we have to do the whole process a second time for the inner inner array of metrics.

2 Likes

Hi guyboertje

Thank you very much for your explanation and patience. I understood now.

Regards,
Cherie

Good luck.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.