Aggregate filter plugin - need help to make it work

Hello!

I have some data in joined db tables.

To collect it I use the following query:

SELECT `item`.`id`, `item`.`name`, `colour`.`name` AS `colour` FROM `item`
LEFT JOIN `item_has_colour` AS `ic` ON `ic`.`item` = `item`.`id`
LEFT JOIN `colour` ON `colour`.`id` = `ic`.`colour`
ORDER BY `item`.`id`

So my final dataset looks like this:

mysql> SELECT `item`.`id`, `item`.`name`, `colour`.`name` AS `colour` FROM `item` LEFT JOIN `item_has_colour` AS `ic` ON `ic`.`item` = `item`.`id` LEFT JOIN `colour` ON `colour`.`id` = `ic`.`colour` ORDER BY `item`.`id`;
+----+-------+--------+
| id | name  | colour |
+----+-------+--------+
|  1 | cat   | red    |
|  1 | cat   | green  |
|  1 | cat   | yellow |
|  2 | dog   | red    |
|  2 | dog   | yellow |
|  3 | mouse | red    |
|  3 | mouse | blue   |
+----+-------+--------+
7 rows in set (0,00 sec)

In order to aggregate information (colour here) belonging to a same record (cat, dog or mouse) I'm trying to use Aggregate filter plugin, according to its docs.

So here is my logstash.conf with filter section:

input {
  jdbc { 
    jdbc_driver_library => "/home/user/mysql-connector-java-8.0.11.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/testdb"
    jdbc_user => "root"
    jdbc_password => "root"
    statement => "SELECT `item`.`id`, `item`.`name`, `colour`.`name` AS `colour` FROM `item` LEFT JOIN `item_has_colour` AS `ic` ON `ic`.`item` = `item`.`id` LEFT JOIN `colour` ON `colour`.`id` = `ic`.`colour` ORDER BY `item`.`id`"
  }
}
filter {
  aggregate {
    task_id => "%{id}"
    code => "
      map['id'] = event.get('id')
      map['name'] = event.get('name')
      map['colours'] ||= []
      map['colours'] << {'colour' => event.get('colour')}
      event.cancel()
    "
    push_previous_map_as_event => true
    timeout => 3
  }
}
output {
  stdout { codec => json_lines }
  elasticsearch {
    "hosts" => "localhost:9200"
    "index" => "test-migrate"
    "document_type" => "data"
    "document_id" => "%{id}"
  }
}

The filter content is almost without changes copied from example in documentation. And as far as I can understand it, it should work correctly.

But when I run Logstash, something weird happens.

Here is its output:

[INFO ] 2018-07-20 14:50:56.565 [[main]<jdbc] jdbc - (0.026601s) SELECT `item`.`id`, `item`.`name`, `colour`.`name` AS `colour` FROM `item` LEFT JOIN `item_has_colour` AS `ic` ON `ic`.`item` = `item`.`id` LEFT JOIN `colour` ON `colour`.`id` = `ic`.`colour` ORDER BY `item`.`id`
{"name":"cat","@timestamp":"2018-07-20T12:50:56.848Z","colours":[{"colour":"yellow"}],"@version":"1","id":1}
{"name":"mouse","@timestamp":"2018-07-20T12:50:56.848Z","colours":[{"colour":"red"}],"@version":"1","id":3}
{"name":"dog","@timestamp":"2018-07-20T12:50:56.855Z","colours":[{"colour":"yellow"}],"@version":"1","id":2}
{"name":"dog","@timestamp":"2018-07-20T12:50:56.841Z","colours":[{"colour":"red"}],"@version":"1","id":2}
{"name":"cat","@timestamp":"2018-07-20T12:50:56.847Z","colours":[{"colour":"red"},{"colour":"green"}],"@version":"1","id":1}
{"tags":["_aggregatefinalflush"],"colours":[{"colour":"blue"}],"@version":"1","id":3,"@timestamp":"2018-07-20T12:50:57.372Z","name":"mouse"}

Here you can see what there were more than 3 events, and only cat got 2 of its 3 colours.

Every next run produces different results.

For example:

[INFO ] 2018-07-20 15:27:00.243 [[main]<jdbc] jdbc - (0.017218s) SELECT `item`.`id`, `item`.`name`, `colour`.`name` AS `colour` FROM `item` LEFT JOIN `item_has_colour` AS `ic` ON `ic`.`item` = `item`.`id` LEFT JOIN `colour` ON `colour`.`id` = `ic`.`colour` ORDER BY `item`.`id`
{"id":2,"colours":[{"colour":"red"}],"@version":"1","@timestamp":"2018-07-20T13:27:00.461Z","name":"dog"}
{"id":1,"colours":[{"colour":"yellow"},{"colour":"red"},{"colour":"green"}],"@version":"1","@timestamp":"2018-07-20T13:27:00.507Z","name":"cat"}
{"id":2,"colours":[{"colour":"yellow"}],"@version":"1","@timestamp":"2018-07-20T13:27:00.510Z","name":"dog"}
{"tags":["_aggregatefinalflush"],"id":3,"@version":"1","@timestamp":"2018-07-20T13:27:00.602Z","name":"mouse","colours":[{"colour":"red"},{"colour":"blue"}]}

So here cat got all of her 3 colours, and mouse got its 2.

But the dog was processed in two separate events, loosing its colours.

It looks like input strings, fetched by db query, are processed in random order.

Any clue on how to fix it? Am I missing something?

1 Like

If you are using an aggregate filter you have to set --pipeline.workers 1. I think different threads are picking up different rows. So one thread is aggregating red and green for cat, but yellow for cat gets processed in a different thread.

Note the second paragraph of the Description section of the docs!

1 Like

How could I miss that? :man_facepalming:

Thanks you a lot for pointing that out!

It works flawlessly now.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.