Generating events from JSON array inside root-level object

I'm trying to import data from a JSON file.

The root level item in the file is an object containing two keys. One of those keys, datapoints, contains an array of objects that I would like to use as events.

I cannot find a way to tell Logstash (or Filebeat, if there's a mechanism in there) to generate events from the array under the datapoints key.

Here is an example of the JSON file I'm receiving:

{
  "messages": [
    "message 1",
    "message 2"
  ],
  "datapoints": [
    {"key1": "value 1", "key2": "value 4"},
    {"key1": "value 2", "key2": "value 5"},
    {"key1": "value 3", "key2": "value 6"}
  ]
}

I would like to extract the array inside the datapoints key and treat each object in the array as an event, as if that array was the only item in the file. I am not concerned with the messages key or its value at all.

If I manually replace the contents of the file with the array inside the datapoints key, resulting in the file contents below, the processing goes ahead mostly fine (although I get a validation error on the first item in the array - different issue though!).

[
  {"key1": "value 1", "key2": "value 4"},
  {"key1": "value 2", "key2": "value 5"},
  {"key1": "value 3", "key2": "value 6"}
]

This generates the desired output:

{
  "key1": "value 1",
  "key2": "value 4"
}
{
  "key1": "value 2",
  "key2": "value 5"
}
{
  "key1": "value 3",
  "key2": "value 6"
}

In this case, I'm using a simple pipeline like this:

input {
  file {
    path => "myjsonfile.json"
    start_position => "beginning"
    codec => "json"
  }
}
output {
  stdout { codec => rubydebug }
}

Is there a way to have Logstash use the array when it's inside an object like in the first example so that I don't have to code something up to modify the file contents before they go to Logstash?

Thanks in advance.

Use a split filter

    mutate { remove_field => [ "messages" ] }
    split { field => "datapoints" }

You can move the fields to the root level using How to dynamically move nested key value to root level.

1 Like

Thanks so much for that. It looks like it should do the trick but I'm getting errors when using it. Logstash is spitting out these warnings:

[WARN ][logstash.filters.split ] Only String and Array types are splittable. field:datapoints is of type = NilClass

Despite the warnings, a bunch of events still come through. They all contain a tags field with a _split_type_failure and some also have a _jsonparsefailure. The warnings sounds like it can't find the datapoints key but I've double-checked my JSON file and pipeline and it's certainly the correct key. Is there something I'm missing?

{
  "message" => "  \"datapoints\": [\r",
  "path" => "myfile.json",
  "tags" => [
      [0] "_jsonparsefailure",
      [1] "_split_type_failure"
  ],
  "@timestamp" => 2019-06-25T19:29:15.235Z,
  "@version" => "1",
  "host" => "mymachine.local"
}

The expected fields and values are in a few of the events, like this one:

{
  "path" => "myfile.json",
  "key1" => "value 1",
  "tags" => [
    [0] "_split_type_failure"
  ],
  "key2" => "value 4",
  "@timestamp" => 2019-06-25T19:29:15.240Z,
  "@version" => "1",
  "host" => "mymachine.local"
}

My pipeline looks like this:

input {
  file {
    path => "myfile.json"
    start_position => "beginning"
    codec => "json"
  }
}

filter {
  mutate {
    remove_field => ["messages"]
  }
  split {
    field => "datapoints"
  }
}

output {
  stdout { codec => rubydebug }
}

Are the _jsonparsefailure messages because of the file encoding or line endings?

Apologies if I'm making an obvious mistake. I'm not new to Elasticsearch but I'm brand new to Logstash.

A file input reads a file and creates one event for each line of the file. That's great for logs, not good for JSON. Use a multiline codec. If you want to consume an entire file as a single line then match a pattern that never occurs and use a timeout

codec => multiline { 
    pattern => "^Spalanzani"
    negate => true
    what => previous
    auto_flush_interval => 1
}
1 Like

Thanks so much. I managed to get it generating the events just right.

With that said, I completely understand why this is necessary but it smells a bit funny to me. Admittedly I'm naive when it comes to Logstash, and doing this may be acceptable, but I may spend the time cleaning up and simplifying the data structure before it gets to Logstash.

The source files I receive can exceed 100k lines so JSON obviously isn't the most ideal format anyway. That's out of my hands, however. What I can do is convert it to a CSV and send that to Logstash.

For the record, this is the pipeline I used to parse the JSON file I started with:

input {
  file {
    path => "myfile.json"
    start_position => "beginning"
    codec => multiline {
      pattern => "^Spalanzani"
      negate => "true"
      what => "previous"
      auto_flush_interval => 1
    }
  }
}

filter {
  json {
    source => "message"
  }
  mutate {
    remove_field => ["messages", "message"]
  }
  split {
    field => "datapoints"
  }
}

output {
  stdout { codec => rubydebug }
}

You want to unconditionally remove [messages], so mutate+remove_field is appropriate. However, for filters that parse fields (like json, or xml) I would make the remove of the [message] field conditional upon it having been successfully parsed. You can do that using

json {
    source => "message"
    remove_field => [ "message" ]
}

The common options like remove_field are only applied if the filter executes successfully.

If you don't want the junk JSON in your main index you can route it to a different output.

output {
    if "_jsonparsefailure" in [tags] {
        file { ... }
    } else {
        stdout { codec => rubydebug }
    }
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.