Different values in new runs of logstash with Aggregate filter

I've used jdbc input plugin to extract financial transactions from database. But each logical transaction is composed from multiple transactions so I used aggregate filter to merge them in one event. Here is the code of my filter:

filter {
  if [type] == 'BUY' {
    aggregate {
      task_id => "%{id}"
      push_map_as_event_on_timeout => true
      timeout_task_id_field => "id"
      timeout => 5
      timeout_tags => ['_aggregatetimeout']
      code => "
#        map.merge!(event)
        if (event.get('reason') == 'NET_AMOUNT' && event.get('type') == 'DEBIT' && event.get('account') == event.get('issuer_account'))
            map['_____m_amount'] = event.get('amount')
            event.cancel
        elsif (event.get('reason') == 'NET_AMOUNT' && event.get('type') == 'CREDIT' && event.get('account') == event.get('issuer_account'))
            map['_____r_amount'] = event.get('amount')
            event.cancel
        elsif (event.get('reason') == 'COMMISSION' && event.get('type') == 'DEBIT'  && event.get('account') == 1)
            map['_____m_commission'] = event.get('amount')
            event.cancel
        elsif (event.get('reason') == 'COMMISSION' && event.get('type') == 'CREDIT'  && event.get('account') == event.get('issuer_account'))
            map['_____r_commission'] = event.get('amount')
            event.cancel
        end
        event.cancel
      "
      timeout_code => "event.set('testttttttttt', event.get('id'))"
    }
  }
}

The problem is that the _____m_commission field is never created despite its if condition is true for one case (at least one transaction met its condition). Also the values of other fields vary in new runs of logstash! It sounds the order of incoming events affect the results of the if conditions despite there is only one event related to each if statement!!

I've set pipeline.workers: 1 and pipeline.java_execution: false in /etc/logstash/pipelines.yml file but nothing changed. I was suspected to the if statements and searched a lot for ruby syntax but it sounds is true .

What is wrong in this filter?

No, but if the order changes map entries may get overwritten with different values. I did not know that pipeline.java_execution was still supported, but I think what you want to set is pipeline.ordered.

Without seeing the source data it is impossible to say why _____m_commission is not created.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.