Aggreation plugin overwrites data
I am trying to implement an aggregation using a MySQL 5.7 database as source.
select
u.id as user_id, u.first_name as first_name, u.last_name as last_name,t.id as 'tag_id', t.name as 'tag_name'
from cms.users u
inner join cms.tag_user tu on tu.user_id = u.id
left join cms.tags t on t.id = tu.tag_id
order by u.id;
This is a sample of the dataset that I obtain by SQL query.
user_id | first_name | last_name | tag_id | tag_name |
---|---|---|---|---|
0b319ef7-d76c-4907-8268-ba23ccbc1e02 | Moises | O'Conner | 0cf029e0-2ac1-46a3-961c-18b7d602f906 | deserunt |
0b319ef7-d76c-4907-8268-ba23ccbc1e02 | Moises | O'Conner | 76e8622b-eee3-4042-87c5-63c66e2b4547 | non |
0b319ef7-d76c-4907-8268-ba23ccbc1e02 | Moises | O'Conner | 9b9a4145-3662-45de-91b2-2a20e9c1f879 | officia |
0b319ef7-d76c-4907-8268-ba23ccbc1e02 | Moises | O'Conner | baceefc2-fd95-46f5-ad7e-f6568320fec6 | ut |
627e2cad-fe31-4f01-b485-688134173eca | Ashly | Gutmann | 55e267d3-38f1-40bd-9e64-3876d6ea2e0c | minima |
6b46ce73-044d-4991-92da-582fe28d7770 | Vergie | O'Connell | 16036f94-3f5f-4b65-9cad-ee6edd637943 | ex |
6b46ce73-044d-4991-92da-582fe28d7770 | Vergie | O'Connell | 4c472bd7-8ca2-4e64-9e14-c6f1aef8949d | commodi |
6b46ce73-044d-4991-92da-582fe28d7770 | Vergie | O'Connell | 55e267d3-38f1-40bd-9e64-3876d6ea2e0c | minima |
6b46ce73-044d-4991-92da-582fe28d7770 | Vergie | O'Connell | 8c1c8040-b491-4469-a387-e14184acf4d7 | earum |
6b46ce73-044d-4991-92da-582fe28d7770 | Vergie | O'Connell | b6dd716f-5bef-49a2-a679-9a3b0af816b5 | doloremque |
The dataset I want to get would be as follows:
{
"first_name" : "Moises",
"last_name" : "O\'Conner",
"user_id" : "0cf029e0-2ac1-46a3-961c-18b7d602f906",
"tags" : [
{
"tag_id" : "0cf029e0-2ac1-46a3-961c-18b7d602f906",
"tag_name" : "deserunt"
},
{
"tag_id" : "76e8622b-eee3-4042-87c5-63c66e2b4547",
"tag_name" : "non"
},
{
"tag_id" : "9b9a4145-3662-45de-91b2-2a20e9c1f879",
"tag_name" : "officia"
},
{
"tag_id" : "baceefc2-fd95-46f5-ad7e-f6568320fec6",
"tag_name" : "ut"
}
]
},
{
"first_name" : "Ashly",
"last_name" : "Gutmann",
"user_id" : "627e2cad-fe31-4f01-b485-688134173eca",
"tags" : [
{
"tag_id" : "55e267d3-38f1-40bd-9e64-3876d6ea2e0c",
"tag_name" : "minima"
}
]
}
.
.
.
The aggregate configuration I have implemented is:
filter {
aggregate {
task_id => "%{user_id}"
code => "
map['user_id'] ||= event.get('user_id')
map['first_name'] ||= event.get('first_name')
map['last_name'] ||= event.get('last_name')
map['tags'] ||= []
map['tags'] << {
'tag_id' => event.get('tag_id'),
'tag_name' => event.get('tag_name')}
event.cancel()
"
push_previous_map_as_event => true
timeout => 5
}
}
The problem I encounter is that when processing the pipeline the results I get in the aggregation are incomplete, being usually, for example, for the Moises
case, 1 or 2 entries in the array of objects, instead of the 4 that there should be. In no case do I get the total number of elements that should exist for each element.
I have tried different approaches but I don't get the expected results. Any idea what I am doing wrong?
Thank you very much for your help.