Calculating sum using the aggregate filter

Hi Team,

We have sample data like this

  log 1
  {
   
    "commonid": "test_567",
    "Key": "test1234",
    "amount": 0,
    "information2": "test_567"
  }
  
  log 2
  {
   
    "commonid": "test_567",
    "reconKey": "test12345",
    "amount": 5,
    "information2": "test_567"
  }
  
  log 3
    {
   
    "commonid": "test_567",
    "reconKey": "test123456",
    "amount": 10,
    "information2": "test_567"
  }

Similarly we have lakhs of records.
Output :- There should be total field which will calculate the sum of amount based on commonkey that is commonid. The output should look like this

  log 1
  {
   
    "commonid": "test_567",
    "Key": "test1234",
    "amount": 0,
    "information2": "test_567",
	"total_amount": 15
  }
    log 2
  {
   
    "commonid": "test_567",
    "reconKey": "test12345",
    "amount": 5,
    "information2": "test_567",
	"total_amount": 15
  }
  
  log 3
    {
   
    "commonid": "test_567",
    "reconKey": "test123456",
    "amount": 10,
    "information2": "test_567",
	"total_amount": 15
  }

We are using the aggregate filter in logstash with the below configuation but not getting any output

  aggregate {
    task_id => "%{commonid}"
    code => "
      map['commonid'] = event.get('commonid')
      map['total_amount'] ||= 0
      map['total_count'] ||= 0
      map['total_amount'] += event.get('amount').to_f
      map['total_count'] += 1
      event.set('total_amount', map['total_amount'])
      event.set('total_count', map['total_count'])
      event.cancel()
    "
    push_map_as_event_on_timeout => true
    timeout_task_id_field => "commonid"
    timeout => 60
    end_of_task => true
  }

  # After all aggregation is complete, add final_total_count and set total_amount to all events
  ruby {
    code => "
      event.set('final_total_count', event.get('total_count'))
    "
  }

  # Set total_amount to all events in the group
  ruby {
    code => "
      event.set('total_amount', event.get('total_amount'))
    "
  }
}

Can we get all the data from elasticsearch input filter or is there any limit on the fetching records ? Or there any other way to achieve the above usecase.