Parse JSON array to flat JSON with filter, or alternative solution

Hi,

Im currently using log4j and kafka to send logs to ELK. I stumbled upon an issue that i am finding a bit strange there is no previous solution, so I am perhaps misusing the system. After reading a few dozen topics i am still a bit lost. So apologies if this is the wrong place or a newbie question.

Given a JSON message in Log4J

    <Kafka name="Kafka" topic="logstash_logs" ignoreExceptions="false">
        <JsonLayout properties="true"/>
        <Property name="bootstrap.servers">localhost:9092</Property>
    </Kafka>

The following being added to Elasticsearch

{
"timeMillis" : 1467217177090,
"thread" : "[ACTIVE] ExecuteThread: '18' for queue: 'weblogic.kernel.Default (self-tuning)'",
"level" : "DEBUG",
"message" : "Request finished",
"endOfBatch" : true,
"loggerFqcn" : "org.apache.logging.log4j.spi.AbstractLogger",
"contextMap" : [ {
"key" : "remoteUser"
}, {
"key" : "hostName",
"value" : "localhost"
}, {
"key" : "durationNanos",
"value" : "759471"
}, {
"key" : "method",
"value" : "GET"
}, {
"key" : "endNanos",
"value" : "202739492421968"
}, {
"key" : "ipAddress",
"value" : "127.0.0.1"
}, {
"key" : "sessionId"
}, {
"key" : "queryString"
}, {
"key" : "requestId",
"value" : "834cc6e2-1d6b-4771-8b15-bf3fe266d916"
}, {
"key" : "servletPath",
"value" : "/index.html"
}, {
"key" : "startNanos",
"value" : "202739491662497"
} ]
}

In Kibana I am having a hard time creating visualizations, i would like for instance to have visualizations for each field, like i want to count the number of GET, POST, etc... Do i need to flatten this structure?

I read that Kibana 4 does not support JSON Array, is there a way to get to the value or should I clean up the structure make it flatter like:

Example:

{
  "timeMillis" : 1467217177090,
  "thread" : "[ACTIVE] ExecuteThread: '18' for queue: 'weblogic.kernel.Default (self-tuning)'",
  "level" : "DEBUG",
  "message" : "Request finished",
  "endOfBatch" : true,
  "loggerFqcn" : "org.apache.logging.log4j.spi.AbstractLogger",
  "hostName" : "localhost",
  "durationNanos" : "759471",
  "method" : "GET",
  "requestId" : "834cc6e2-1d6b-4771-8b15-bf3fe266d916"
}

I have tried to use StephenGoodall code, but it trows exception when parsing

Pipeline main started
Error parsing json {:source=>"contextMap", :raw=>"{\"remoteUser\",\"hostName\":\"localhost\",\"method\":\"GET\",\"requestId\":\"06fe3cb7-7c0a-451b-955b-3c8a44e18ffc\",\"requestURL\":\"http://localhost:7001/test-log4j2-kafka-elk-jetty-1.0-SNAPSHOT/index.html\",\"servletPath\":\"/index.html\",\"ipAddress\":\"127.0.0.1\",\"contextPath\":\"/test-log4j2-kafka-elk-jetty-1.0-SNAPSHOT\",\"sessionId\",\"queryString\",\"startNanos\":\"202538443760806\"}", :exception=>#<LogStash::Json::ParserError: Unexpected character (',' (code 44)): was expecting a colon to separate field name and value
 at [Source: [B@2473d2b3; line: 1, column: 15]>, :level=>:warn}

Any pointers on how to properlly use ELK would be very appreciated. I am now looking at doing this with Ruby code (I have no experience in Ruby!) and then try to create a filter with that... But perhaps I am missing something perhaps there is a simpler way?

to flatten the array I can do

h = Hash.new(nil)
json["contextMap"].each { |element|
if element["value"] != nil
h[element["key"]] = element["value"]
end
}
puts h.to_json`

but I am not sure how to create a logstash filter...

Use a ruby filter. Untested:

ruby {
  code => "
    event['contextMap'].each { |kv|
      event[kv['key']] = kv['value'] unless kv['value'].nil?
    }
  "
}
1 Like

I think you already know that it throws the exception because of the following (just including this in case it's useful for others):
{ "key" : "remoteUser" }

has no corresponding value, so once my ruby filter converted it, it became:
{\"remoteUser\",\"hostName\":\"localhost\",\"method\":\"GET\",\"requestId\":\"06fe3cb7-7c0a-451b-955b-3c8a44e18ffc\",\"requestURL\":\"http://localhost:7001/test-log4j2-kafka-elk-jetty-1.0-SNAPSHOT/index.html\",\"servletPath\":\"/index.html\",\"ipAddress\":\"127.0.0.1\",\"contextPath\":\"/test-log4j2-kafka-elk-jetty-1.0-SNAPSHOT\",\"sessionId\",\"queryString\",\"startNanos\":\"202538443760806\"}"

Which if you remove all the escape characters, is the same as:
{"remoteUser","hostName":"localhost","method":"GET","requestId":"06fe3cb7-7c0a-451b-955b-3c8a44e18ffc","requestURL":"http://localhost:7001/test-log4j2-kafka-elk-jetty-1.0-SNAPSHOT/index.html","servletPath":"/index.html","ipAddress":"127.0.0.1","contextPath":"/test-log4j2-kafka-elk-jetty-1.0-SNAPSHOT","sessionId","queryString","startNanos":"202538443760806"}"

Which is no longer valid JSON due to the empty values, eg remoteUser, sessionId and queryString.

I had to change the code that created/populated the MDC fields, so it only added MDC fields that had values, so the JSON that was formed after the Ruby filter had converted it was valid.

Hopefully the above Ruby filter that Magnus posted will strip out these values for you! Let us all know if it works for you :smiley:

1 Like

Thank you both!

I have it running, just like i wanted. I will try to see the performance impact :smiley:

input {
stdin { }
}
filter {
json { source => "message" }
if [contextMap] {
ruby {
code => "
event['contextMap'].each { |kv|
event[kv['key']] = kv['value'] unless kv['value'].nil?
}
"
}
}
if [thrown]{
ruby {
code => "
temp = event['thrown']['extendedStackTrace']
event['thrown']['extendedStackTrace'] = temp.join('')
"
}
mutate {
gsub => ['[thrown][extendedStackTrace]', '=>', ':']
}
json { source => '[thrown][extendedStackTrace]' target => '[thrown][extendedStackTrace]'}
}
date {
match => [ "timeMillis", "UNIX_MS" ]
timezone => "Europe/London"
}
mutate{
remove_field=>["contextMap"]
}
}

output {
stdout { codec => rubydebug }
}