Need to aggregate data in a CSV file based on a key

Hi Everyone,

I have an use case where i need to aggregate the lines in a CSV file to get the sum of amount, quantity of an order based on the ordernumber. I have tried to use "aggregate" filter but i am end up getting one record for unique ordernumber but the values are not getting added instead i am getting last values for the ordernumber.

Sample csv data:

order_number,item_id,item_name,quantity,price
1001,A1,Item1,2,10
1001,A2,Item2,1,15
1002,B1,Item3,3,5

Desired Output:

{
  "order_number": "1001",
  "total_price": 25.0,
  "total_quantity": 3,
},
{
  "order_number": "1002",
  "total_price": 5.0,
  "total_quantity": 3,
}

What i am getting:( Getting the last latest values of price, quantity for the ordernumber instead of sum)

{
  "order_number": "1001",
  "total_price": 15.0,
  "total_quantity": 1,
},
{
  "order_number": "1002",
  "total_price": 5.0,
  "total_quantity": 3,
}

My aggregate filter:

filter {
  aggregate {
    task_id => "%{order_number}"
    code => "
      quan = event.get('quantity') ? event.get('quantity').to_i : 0
      price = event.get('price') ? event.get('price').to_f : 0.0

      map['total_quantity'] ||= 0
      map['total_price'] ||= 0.0

      map['total_quantity'] += quan
      map['total_price'] += quan * price

      map['order_number'] = event.get('order_number')
    "
    push_map_as_event_on_timeout => true
    timeout_task_id_field => "order_number"
    timeout => 10
    timeout_tags => ["aggregated"]
  }

  if !("aggregated" in [tags]) {
    drop {}
  }
}

I have used the pipeline workers as 1.

Please help me to solve this issue.

Thanks in advance.

If I use csv { autodetect_column_names => true } then I get 35.0 and 15.0 for [total_price]. I see nothing wrong with the aggregate filter.

Hi @Badger, Thankyou for the input. Let me try with the suggestion.