Hi Everyone,
I have an use case where i need to aggregate the lines in a CSV file to get the sum of amount, quantity of an order based on the ordernumber. I have tried to use "aggregate" filter but i am end up getting one record for unique ordernumber but the values are not getting added instead i am getting last values for the ordernumber.
Sample csv data:
order_number,item_id,item_name,quantity,price
1001,A1,Item1,2,10
1001,A2,Item2,1,15
1002,B1,Item3,3,5
Desired Output:
{
  "order_number": "1001",
  "total_price": 25.0,
  "total_quantity": 3,
},
{
  "order_number": "1002",
  "total_price": 5.0,
  "total_quantity": 3,
}
What i am getting:( Getting the last latest values of price, quantity for the ordernumber instead of sum)
{
  "order_number": "1001",
  "total_price": 15.0,
  "total_quantity": 1,
},
{
  "order_number": "1002",
  "total_price": 5.0,
  "total_quantity": 3,
}
My aggregate filter:
filter {
  aggregate {
    task_id => "%{order_number}"
    code => "
      quan = event.get('quantity') ? event.get('quantity').to_i : 0
      price = event.get('price') ? event.get('price').to_f : 0.0
      map['total_quantity'] ||= 0
      map['total_price'] ||= 0.0
      map['total_quantity'] += quan
      map['total_price'] += quan * price
      map['order_number'] = event.get('order_number')
    "
    push_map_as_event_on_timeout => true
    timeout_task_id_field => "order_number"
    timeout => 10
    timeout_tags => ["aggregated"]
  }
  if !("aggregated" in [tags]) {
    drop {}
  }
}
I have used the pipeline workers as 1.
Please help me to solve this issue.
Thanks in advance.