I've used jdbc input plugin to extract financial transactions from database. But each logical transaction is composed from multiple transactions so I used aggregate filter to merge them in one event. Here is the code of my filter:
filter {
if [type] == 'BUY' {
aggregate {
task_id => "%{id}"
push_map_as_event_on_timeout => true
timeout_task_id_field => "id"
timeout => 5
timeout_tags => ['_aggregatetimeout']
code => "
# map.merge!(event)
if (event.get('reason') == 'NET_AMOUNT' && event.get('type') == 'DEBIT' && event.get('account') == event.get('issuer_account'))
map['_____m_amount'] = event.get('amount')
event.cancel
elsif (event.get('reason') == 'NET_AMOUNT' && event.get('type') == 'CREDIT' && event.get('account') == event.get('issuer_account'))
map['_____r_amount'] = event.get('amount')
event.cancel
elsif (event.get('reason') == 'COMMISSION' && event.get('type') == 'DEBIT' && event.get('account') == 1)
map['_____m_commission'] = event.get('amount')
event.cancel
elsif (event.get('reason') == 'COMMISSION' && event.get('type') == 'CREDIT' && event.get('account') == event.get('issuer_account'))
map['_____r_commission'] = event.get('amount')
event.cancel
end
event.cancel
"
timeout_code => "event.set('testttttttttt', event.get('id'))"
}
}
}
The problem is that the _____m_commission
field is never created despite its if condition is true for one case (at least one transaction met its condition). Also the values of other fields vary in new runs of logstash! It sounds the order of incoming events affect the results of the if conditions despite there is only one event related to each if statement!!
I've set pipeline.workers: 1
and pipeline.java_execution: false
in /etc/logstash/pipelines.yml file but nothing changed. I was suspected to the if statements and searched a lot for ruby syntax but it sounds is true .
What is wrong in this filter?