Aggregate plugin does not add all results

Aggreation plugin overwrites data

I am trying to implement an aggregation using a MySQL 5.7 database as source.

select
u.id as user_id, u.first_name as first_name, u.last_name as last_name,t.id as 'tag_id', t.name as 'tag_name'
from cms.users u
inner join cms.tag_user tu on tu.user_id = u.id
left join cms.tags t on t.id = tu.tag_id
order by u.id;

This is a sample of the dataset that I obtain by SQL query.

user_id first_name last_name tag_id tag_name
0b319ef7-d76c-4907-8268-ba23ccbc1e02 Moises O'Conner 0cf029e0-2ac1-46a3-961c-18b7d602f906 deserunt
0b319ef7-d76c-4907-8268-ba23ccbc1e02 Moises O'Conner 76e8622b-eee3-4042-87c5-63c66e2b4547 non
0b319ef7-d76c-4907-8268-ba23ccbc1e02 Moises O'Conner 9b9a4145-3662-45de-91b2-2a20e9c1f879 officia
0b319ef7-d76c-4907-8268-ba23ccbc1e02 Moises O'Conner baceefc2-fd95-46f5-ad7e-f6568320fec6 ut
627e2cad-fe31-4f01-b485-688134173eca Ashly Gutmann 55e267d3-38f1-40bd-9e64-3876d6ea2e0c minima
6b46ce73-044d-4991-92da-582fe28d7770 Vergie O'Connell 16036f94-3f5f-4b65-9cad-ee6edd637943 ex
6b46ce73-044d-4991-92da-582fe28d7770 Vergie O'Connell 4c472bd7-8ca2-4e64-9e14-c6f1aef8949d commodi
6b46ce73-044d-4991-92da-582fe28d7770 Vergie O'Connell 55e267d3-38f1-40bd-9e64-3876d6ea2e0c minima
6b46ce73-044d-4991-92da-582fe28d7770 Vergie O'Connell 8c1c8040-b491-4469-a387-e14184acf4d7 earum
6b46ce73-044d-4991-92da-582fe28d7770 Vergie O'Connell b6dd716f-5bef-49a2-a679-9a3b0af816b5 doloremque

The dataset I want to get would be as follows:

{
  "first_name" : "Moises",
  "last_name" : "O\'Conner",
  "user_id" : "0cf029e0-2ac1-46a3-961c-18b7d602f906",
  "tags" : [
    {
      "tag_id" : "0cf029e0-2ac1-46a3-961c-18b7d602f906",
      "tag_name" : "deserunt"
    },
    {
      "tag_id" : "76e8622b-eee3-4042-87c5-63c66e2b4547",
      "tag_name" : "non"
    },
    {
      "tag_id" : "9b9a4145-3662-45de-91b2-2a20e9c1f879",
      "tag_name" : "officia"
    },
    {
      "tag_id" : "baceefc2-fd95-46f5-ad7e-f6568320fec6",
      "tag_name" : "ut"
    }
  ]
},
{
  "first_name" : "Ashly",
  "last_name" : "Gutmann",
  "user_id" : "627e2cad-fe31-4f01-b485-688134173eca",
  "tags" : [
    {
      "tag_id" : "55e267d3-38f1-40bd-9e64-3876d6ea2e0c",
      "tag_name" : "minima"
    }
  ]
}
.
.
.

The aggregate configuration I have implemented is:


filter {

  aggregate {
    task_id => "%{user_id}"
    code => "
      map['user_id'] ||= event.get('user_id')
      map['first_name'] ||= event.get('first_name')
      map['last_name'] ||= event.get('last_name')

      map['tags'] ||= []
      map['tags'] << {
      'tag_id' => event.get('tag_id'),
      'tag_name' => event.get('tag_name')}

      event.cancel()
    "
    push_previous_map_as_event => true
    timeout => 5
  }
}

The problem I encounter is that when processing the pipeline the results I get in the aggregation are incomplete, being usually, for example, for the Moises case, 1 or 2 entries in the array of objects, instead of the 4 that there should be. In no case do I get the total number of elements that should exist for each element.

I have tried different approaches but I don't get the expected results. Any idea what I am doing wrong?

Thank you very much for your help.

Have you set pipeline.workers to 1? If subsets of the data are getting processed by different worker threads than you will get subsets aggregated.

I had not set this parameter. I just ran a test with this values:

pipeline.workers: 1
pipeline.ordered: true

But I get the same result.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.