Records are getting dropped while creating map using logstash

Hi Experts,

I need your help here. Context is, I am trying to read the data from DB after multiple joins and then forming a Map in Elasticsearch using ID column as agreegate and trying to handle the duplicate event from getting indexed in map. Below are the details step by step I performed. Please do advise where I am going wrong. Thanking you all in advance.

I'm fetching some data from db with multiple associated tables.

My query:

SELECT,, AS colour, AS material FROM item
LEFT JOIN item_has_colour AS ic ON ic.item =
LEFT JOIN colour ON = ic.colour
LEFT JOIN item_has_material AS im ON im.item =
LEFT JOIN material ON = im.material

It produces the next dataset:

mysql> SELECT `item`.`id`, `item`.`name`, `colour`.`name` AS `colour`, `material`.`name` AS `material` FROM `item` LEFT JOIN `item_has_colour` AS `ic` ON `ic`.`item` = `item`.`id` LEFT JOIN `colour` ON `colour`.`id` = `ic`.`colour` LEFT JOIN `item_has_material` AS `im` ON `im`.`item` = `item`.`id` LEFT JOIN `material` ON `material`.`id` = `im`.`material` ORDER BY `item`.`id`;
| id | name  | colour | material_id | material   |
|  1 | cat   | red    | 101         | ceramic    |
|  1 | cat   | green  | 101         | ceramic    |
|  1 | cat   | yellow | 101         | ceramic    |
|  1 | cat   | red    | 102         | plastic    |
|  1 | cat   | green  | 102         | plastic    |
|  1 | cat   | yellow | 102         | plastic    |
|  1 | cat   | green  | 103         | wooden     |
|  1 | cat   | yellow | 103         | wooden     |
|  2 | dog   | red    | 101         | ceramic    |
|  2 | dog   | yellow | 101         | ceramic    |
|  3 | mouse | red    | 101         | ceramic    |
|  3 | mouse | blue   | 101         | ceramic    |
|  3 | mouse | red    | 102         | plastic    |
|  3 | mouse | blue   | 102         | plastic    |
|  3 | cat   | green  | 103         | wooden     |
|  3 | cat   | yellow | 103         | wooden     |
17 rows in set (0,00 sec)

This is my logstash conf code:

filter {
  aggregate {
    task_id => "%{id}"
    code => "
      map['id'] = event.get('id')
      map['name'] = event.get('name')
	  map['material_array'] ||= []
	  materials = { 'material_id' => event.get('material_id') , 'material' => event.get('material') }
        if (event.get('material_id') != nil )
            if ! map['material_array'].include?(materials )
            map['material_array'] << materials
    push_previous_map_as_event => true
    timeout => 20

Output in debug mode:-

    "name" => "cat",
    "@timestamp" => 2022-03-11T15:27:37.393Z,
      "@version" => "1",
      "id" => 1,
      "materials" => [
        [0] {
            "material" => "ceramic"
        [1] {
            "material" => "plastic"
        [2] {
            "material" => "wooden"

But when I search for this in Kibana then I am not getting the complete array. for eg: out of ceramic, plastic, wooden materials only ceramic is present in map or sometimes ceramic and wooden is there but not all 3 values would be there under materials map. only few times proper map was created. Because of this intermittent issue I am not able to understand where it is going wrong. Please do help me as I have production release tomorrow.

@Badger @Fabio-sama @magnusbaeck @codecounselor


Please do not tag people that are not part of the topic, also there is no SLA in this forum, you need to wait to see if someone can answer your question.

What is your input? Share your full pipeline.

You are this pipeline with pipeline.workers set to 1, right?

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.