Split objects in a nested array into separate record

Hi there,

I have the following data structure in Elasticsearch.

{
  "id": "559466",
  "name": "test",
  "availability": [
    {
      "assetId": "559466",
      "type": "pen",
      "complete": true,
      "availableFrom": "2005-01-01T00:00:00",
      "price": 3.99
    },
    {
      "assetId": "559466",
      "type": "pencil",
      "complete": true,
      "availableFrom": "2005-01-01T00:00:00",
      "price": 4.99
    }
  ]
}

I want the above to be copied into database table having following fields per each record.

id, name, assetId, type, complete, availableFrom, price.

I would like to know how to achieve this in Logstash. Thanks in advance.

Use a split filter.

If, post-split, you want to move the fields of availability to the top-level, use a ruby filter, like this.

.

Thanks @Badger.

I added the split filter as follows

filter{
  split{
        field => "availability"
  }
}

It is giving me following error.

split - Only String and Array types are splittable. field:availability is of type = NilClass

What could be the reason?

The [availability] field does not exist. Is the structure you showed nested inside something else?

What does the output look like if you use

output { stdout { codec => rubydebug } }

or else, what does the _source field look like if you grab an event from elasticsearch?

data structure I have given there is copy of _source taken from ES. I need to check if there are any objects that has no "availability" array. Could that be causing the error?

Once they are split, do I refer to them as root level attributes? i.e. instead of "[availability][price]" should I use "price" ?

If some events do not have the field then use

if [availability] {
    filter{
        split{
            field => "availability"
        }
    }
}

You can either move the attributes to the root level using something like the code I linked to or you can refer to [availability][price]. Your choice.

Thanks @Badger

That did help to get what I need. I had to move IF condition into filter block. Apart from that it does the job. I also used ruby code to move attributes to root element.

How do I copy "availableFrom" into timestamp type column in DB?

There is a jdbc output plugin written by a third party. It is not supported by elastic and the person who wrote it has not modified it in over a year, so it is not clear that they are supporting it either.

Thanks @Badger.

I used the date filter as follows;

date {
            match => ["availableFrom", "yyyy-MM-dd'T'HH:mm:ss"]
            target => "start_date"
 }

Afterwards I had to cast it to date type in INSERT statement as otherwise DB treated start_date attribute to be string even though it is a date type (though date filter converts it).

statement => ["INSERT INTO products (asset_id, asset_type, available_from) values (?, ?, CAST (? AS timestamp))", "assetId", "type", "start_date"]

This allows inserting date attribute into timestamptz (Timestamp with time zone) field in the database table.

Since "split" filter clones events, I do have another problem. This means the entries in root level get duplicated as well. I'm thinking of using two separate INSERT statements with two tables, one for root and another one for nested array items. In order to avoid duplicates, is it a must to run them as two separate pipelines with two separate configuration files?

It may not be a "must" but the forked-path pattern for pipeline-to-pipeline communications might be useful to you.

To do it in a single pipeline you would use a clone filter to duplicate the event, then process one copy with the split and in the other just retain the fields you want in the root.

Thanks @Badger. That is an awesome idea. If I clone the event and split the cloned copy, how do I enforce Logstash to loop through split items while process original event (single event) in output plugin?

    clone { clones => [ "avail" ] }
    if [type] == "avail" {
        mutate { remove_field => [ "id", "name" ] }
        split { field => "availability" }
        ruby {
            code => 'event.get("availability").each { |k, v| event.set(k,v) }; event.remove("availability")'
        }
    } else {
        mutate { remove_field => [ "availability" ] }
    }

If you need to use different outputs (to use different INSERT statements) then make it conditional upon field existence.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.