Hi Everyone,
I have an use case where i need to aggregate the lines in a CSV file to get the sum of amount, quantity of an order based on the ordernumber. I have tried to use "aggregate" filter but i am end up getting one record for unique ordernumber but the values are not getting added instead i am getting last values for the ordernumber.
Sample csv data:
order_number,item_id,item_name,quantity,price
1001,A1,Item1,2,10
1001,A2,Item2,1,15
1002,B1,Item3,3,5
Desired Output:
{
"order_number": "1001",
"total_price": 25.0,
"total_quantity": 3,
},
{
"order_number": "1002",
"total_price": 5.0,
"total_quantity": 3,
}
What i am getting:( Getting the last latest values of price, quantity for the ordernumber instead of sum)
{
"order_number": "1001",
"total_price": 15.0,
"total_quantity": 1,
},
{
"order_number": "1002",
"total_price": 5.0,
"total_quantity": 3,
}
My aggregate filter:
filter {
aggregate {
task_id => "%{order_number}"
code => "
quan = event.get('quantity') ? event.get('quantity').to_i : 0
price = event.get('price') ? event.get('price').to_f : 0.0
map['total_quantity'] ||= 0
map['total_price'] ||= 0.0
map['total_quantity'] += quan
map['total_price'] += quan * price
map['order_number'] = event.get('order_number')
"
push_map_as_event_on_timeout => true
timeout_task_id_field => "order_number"
timeout => 10
timeout_tags => ["aggregated"]
}
if !("aggregated" in [tags]) {
drop {}
}
}
I have used the pipeline workers as 1.
Please help me to solve this issue.
Thanks in advance.