Aggregate filter plugin - nested aggregation

Hello!

I am trying to aggregate some data from DB by Logstash.
My data in db looks like this:

+----+----------------+---------------------+----------------------+----------------------+
| # | product_id | product_name | property_name | property_value |
+----+----------------+---------------------+----------------------+----------------------+
| 1 | 100 | pc | colour | black |
+----+----------------+---------------------+----------------------+----------------------+
| 2 | 100 | pc | colour | silver |
+----+----------------+---------------------+----------------------+----------------------+
| 3 | 100 | pc | ram | 16Gb |
+----+----------------+---------------------+----------------------+----------------------+
| 4 | 100 | pc | hdd | 200Gb |
+----+----------------+---------------------+----------------------+----------------------+
| 5 | 101 | printer | colour | black |
+----+----------------+---------------------+----------------------+----------------------+
| 6 | 101 | printer | features | wifi |
+----+----------------+---------------------+----------------------+----------------------+
| 7 | 101 | printer | features | scanner |
+----+----------------+---------------------+----------------------+----------------------+
| 8 | 101 | printer | type | mate |
+----+----------------+---------------------+----------------------+----------------------+
| 9 | 102 | laptop | features | wifi 5Ghz |
+----+----------------+---------------------+----------------------+----------------------+
| 10 | 102 | laptop | colour | white |
+----+----------------+---------------------+----------------------+----------------------+
| 11 | 102 | laptop | hdd | 512Gb |
+----+----------------+---------------------+----------------------+----------------------+

I want to aggregate data by product_id, property_name in the following way:

[
    {
        "id": 100,
        "name": "pc",
        "properties": {
            "colour": [
                "black",
                "silver"
            ],
            "hdd": [
                "200Gb"
            ],
            "ram": [
                "16Gb"
            ]
        }
    },
    {
        "id": 101,
        "name": "printer",
        "properties": {
            "features": [
                "wifi",
                "scanner"
            ],
            "colour": [
                "black"
            ],
            "type": [
                "mate"
            ]
        }
    },
    {
        "id": 102,
        "name": "laptop",
        "properties": {
            "features": [
                "wifi 5Ghz"
            ],
            "colour": [
                "white"
            ],
            "hdd": [
                "512Gb"
            ]
        }
    }
]

For this purpose I am trying to use aggregate filter plugin and read example #4 of docs and this topic.

Here is my logstash.conf (filter part):

filter {
  aggregate {
    task_id => "%{product_id}"
    code => "
      map['product_id'] = event.get('product_id')
      map['product_name'] = event.get('product_name')
      map['properties'] ||= {}

      map[event.get('property_name')] ||= []                                         
      map[event.get('property_name')] << event.get('property_value') 

      event.cancel()
    "
    push_previous_map_as_event => true
    timeout => 3
  }
}

The output of filtering is:

{"product_id":100, "product_name":"pc", "properties":[], "colour":["black","silver"], "ram":["16Gb"], "hdd":["200Gb"]}
{"product_id":101, "product_name":"printer", "properties":[], "colour":["black"], "type":["mate"], "features":["wifi","scanner"]}
{"product_id":102, "product_name":"laptop", "properties":[], "colour":["white"],"hdd":["512Gb"], "features":["wifi 5Ghz"]}

But I need that properties (like "colour", "ram", "hdd") will be inside "properties" field.
For this purpose I tried to use

map['properties'] ||= []
map['properties'] << {
   map[event.get('property_name')] ||= []                                         
   map[event.get('property_name')] << event.get('property_value') 
}

But that doesn't work.
I'm not familiar with that syntax, so any idea how to put properties (like "colour", "ram", "hdd") inside "properties" ? Am I missing something?
Thank you!

event.set("[properties][" + event.get('property_name') + "]", ...)

Could you explain, instead of what or how I should use this code ?
I tried to use it as

map['properties'] <<
        event.set('[properties][' + event.get('property_name') + ']', event.get('property_value'))

The result of above line in ''properties" field like

"properties":["black","silver","16Gb","200Gb"]
"properties":["black","wifi","scanner","mate"]
...

if I use at a new line event.set('[properties][' + event.get('property_name') + ']', event.get('property_value')) my "properties" field is empty.

But my expectation is:

"properties": {
  "colour": ["black","silver"],
  "hdd": ["200Gb"],
  "ram": ["16Gb"]
}

Sorry, I made the incorrect assumption that if the syntax worked with event.set it would also work with an insertion into map. I was wrong.

So I would fix this by adding a mutate filter after the aggregate.

mutate {
    rename => { 
         "colour" => "[properties][colour]" 
         "features" => "[properties][features]"
         "hdd" => "[properties][hdd]"
     }
}

It is counter-intuitive to me, but although aggregate drops every event, it also creates events that continue down the pipeline.

You are right!
Thank you a lot!

It took me a while to figure it out, but this can be done in the aggregate filter.

    aggregate {
        task_id => "%{product_id}"
        code => "
            map['product_id'] = event.get('product_id')
            map['product_name'] = event.get('product_name')

            map['properties'] ||= {}
            if map['properties'].has_key? event.get('property_name')
                map['properties'][event.get('property_name')] << event.get('property_value')
            else
                p = { event.get('property_name') => [ event.get('property_value') ] }
                map['properties'] = map['properties'].merge(p)
            end

            event.cancel()
        "
        push_previous_map_as_event => true
        timeout => 3
    }
1 Like

Great solution!

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.