Logstash filter: aggregate nested arrays

I'm trying to fetch data from MySQL and push it to ElasticSearch using LogStash, although I'm having trouble creating a config file for LogStash that suits my need

I'm trying to achieve this result

{
   "products":[
      {
         "id":1,
         "deals":[
            {
               "id":5,
               "options":[
                  {
                     "id":3
                  },
                  {
                     "id":8
                  }
               ]
            }
         ]
      }
   ]
}

In MySQL, each of these has its own table, meaning that

Product -> Deal (ONE => MANY) |
Deal -> Deal Option(ONE => MANY)

To combine them all, I have a MySQL View that would LEFT JOIN those tables so I can process everything using LogStash

Here is my current LogStash Configuration

filter {
  aggregate {
    task_id => "%{id}"
    code => "
    map['id'] ||= event.get('id')

    map['deals'] ||= []    
    map['deals'] << {'id' => event.get('deal_id')}


    event.cancel()
    "
    push_previous_map_as_event => true
    timeout => 15
  }
}

Although I got stuck at the part where I need to add Deal Options to a Deal, is my current code correct? If it is, how can I complete it, thanks for your time!

Take a look at this thread, which should give you some ideas.

1 Like

Sorry, I did already read what's on that thread, I don't quite understand how you exactly did it, can you please elaborate a bit on my use case, it would be really appreciated!

What does your data look like? In JSON notation is it like this?

{ "product": 1, "deal": 1, "dealOption": 1}
{ "product": 1, "deal": 1, "dealOption": 2}
{ "product": 1, "deal": 2, "dealOption": 75}
{ "product": 14, "deal": 12, "dealOption": 7}

Yes that's correct.

You could try

    aggregate {
        task_id => "%{product}"
        code => '
            map["id"] ||= event.get("product")

            deal = event.get("deal")
            map["deals"] ||= []
            map["deals"][deal] ||= []

            option = event.get("dealOption")
            map["deals"][deal][option] ||= []
            map["deals"][deal][option] << { "id" => option }

            event.cancel()
        '
        timeout => 5
        push_map_as_event_on_timeout => true
        timeout_code => '
            deals = event.get("deals")

            newDeals = []
            deals.each_index { |x|
                if deals[x]
                    newOptions = []
                    deals[x].each_index { |y|
                        if deals[x][y]
                            newOptions << { "id" => y }
                        end
                    }
                    newDeals << { "id" => x, "options" => newOptions }
                end
            }
            event.set("deals", newDeals)
        '
    }

That builds map["deals"] as an array of arrays. Most of the entries will be nil. For example, if product 3 has deal 6 and that has options 1 and 2 the map entry will look like

{"id"=>3, "deals"=>[nil, nil, nil, nil, nil, nil, [nil, [{"id"=>1}], [{"id"=>2}]]]}

The timeout code then iterates through every array and collects all the array indices (ids) that are not nil.

Overall, that will convert

{ "product": 3, "deal": 6, "dealOption": 1}
{ "product": 3, "deal": 6, "dealOption": 2}
{ "product": 3, "deal": 2, "dealOption": 11}
{ "product": 3, "deal": 2, "dealOption": 18}
{ "product": 14, "deal": 12, "dealOption": 7}

into

{
         "deals": [
         {
            "options": [
                { "id": 11 },
                { "id": 18 }
            ],
                 "id": 2
        },
        {
            "options": [
                { "id": 1 },
                { "id": 2 }
            ],
                 "id": 6
        }
    ],
            "id": 3,
      "@version": "1",
    "@timestamp": 2021-08-10T21:45:55.935Z
}

etc.

1 Like

Okay so I started by changing what I had before

    map['deals'] ||= []    
    map['deals'] << {'id' => event.get('deal_id')}

to

deal = event.get("deal")
map["deals"] ||= []
map["deals"][deal] ||= []

and I'm getting an Aggregate exception (no implicit conversion from nil to integer)

Was able to solve this by adding a few if checks, thank you very much!

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.