Add '--pipeline.batch.size 1' to the command line (or adjust it in pipelines.yml for the specific pipeline or ...)
What is happening is that the first 125 lines of the file go through the first aggregate filter before any lines hit the second aggregate. When the first type i is processed by the second aggregate it deletes the map entry for that task_id, so the when the next two lines go through the filter has no map for the task_id and the filter is a no-op.
Note that with the java execution engine enabled logstash will re-order lines even with a single worker thread, so a solution like this is going to be fragile.