Hi all,
Aggregate filter not working in logstash 5.4.2. I used latest aggregate filter plugin and my config file as follows.
input {
jdbc {
jdbc_connection_string => "jdbc:oracle:thin:@drssqlentrac_sc.aaa-acg.net:1521/orasliud"
jdbc_user => "admin"
jdbc_password => "admin"
jdbc_driver_library => "C:\Official\ojdbc6-11.2.0.3.jar"
jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
statement => "SELECT CI.FIRST_NAME||'_'||CI.LAST_NAME||'_'||CI.GENDER||'_'||TO_CHAR(CI.DATE_OF_BIRTH,'YYYYMMDD') AS message, CI.TITLE AS prefix, CI.FIRST_NAME AS firstname, CI.MIDDLE_NAME AS middlename, CI.LAST_NAME AS lastname, CI.TITLE_SUFFIX AS suffix, CI.GENDER AS gender, CI.DATE_OF_BIRTH AS dob, CI.MARITAL_STATUS AS maritalstatus, CI.SOCIAL_SECURITY_NUM AS ssn, CI.REF_NUMBER AS clientid, NULL AS customersegmentdescription, NULL AS householdid, CASE WHEN CIP.PRIMARY_FLAG = 'Y' THEN PHONE_NUM END AS primaryphone, CASE WHEN CIP.NUM_TYPE = 'Home' THEN PHONE_NUM END AS homephone, CASE WHEN CIP.NUM_TYPE = 'Office' THEN PHONE_NUM END AS workphone, CIP.EXTENSION AS extension, CASE WHEN CIP.NUM_TYPE = 'Cell' THEN PHONE_NUM END AS mobilephone, CASE WHEN CIP.NUM_TYPE = 'Fax' THEN PHONE_NUM END AS faxphone, CI.EMAIL_ADDRESS AS primaryemail, NULL AS secondaryemail, regexp_replace(CO.ADDRESS_LINE1,'[^a-zA-Z0-9 ]+','') AS address1, regexp_replace(CO.ADDRESS_LINE2,'[^a-zA-Z0-9 ]+','') AS address2, regexp_replace(CO.ADDRESS_LINE3,'[^a-zA-Z0-9 ]+','') AS address3, CO.COUNTY_ID ||' - ' ||CO.COUNTY AS county, CO.CITY AS city, CO.STATE_ID ||' - ' ||CO.STATE AS state, CO.ZIPCODE AS zipcode, CO.COUNTRY AS country, CPA.ADDRESS_TYPE AS addresstype, CI.DL_NUMBER AS licensenumber, CI.DL_STATE_ID AS licensestate, CPS.MEMBERSHIP_NUMBER AS membershipid, NULL AS membershipflag, CPT.POLICY_NUM_PREFIX ||CPT.POLICY_NUM AS policynumber, CPT.STATUS AS policystatus, NULL AS associatedrole, CPY.REF_NUMBER AS agentid, CPY.FULL_NAME AS agentname, NULL AS agentype, NULL AS agentcontactnumber, CPY.FULL_NAME AS agencyname, NULL AS agentnumber FROM CO_INSURED CI LEFT OUTER JOIN CO_INSURED_PHONE_NUM CIP ON CI.CO_ID = CIP.CO_ID LEFT OUTER JOIN CO_ADDRESS CO ON CI.CO_ID = CO.CO_ID LEFT OUTER JOIN CO_PARTY_ADDRESS CPA ON CI.CO_ID = CPA.CO_ID LEFT OUTER JOIN CO_POLICY_SNAPSHOT CPS ON CI.CO_ID = CPS.CO_ID LEFT OUTER JOIN CO_POLICY_TERM CPT ON CI.CO_ID = CPT.CO_ID LEFT OUTER JOIN CO_PARTY CPY ON CI.CO_ID = CPY.CO_ID"
add_field => {
"batch_id" => "SPL%{+YYYYMMdd}"
}
}
}
filter {
ruby {
code => "fieldArray = event.get('firstname').to_s.split(' ')
a = Array.new
fieldArray.map do |word|
a.push(word.capitalize)
end
event.set('firstname',a.join(' '))"
}
ruby {
code => "fieldArray2 = event.get('lastname').to_s.split(' ')
b = Array.new
fieldArray2.map do |word2|
b.push(word2.capitalize)
end
event.set('lastname',b.join(' '))"
}
aggregate {
task_id => "%{message}"
code => "
map['message'] = event.get('message')
map['prefix'] = event.get('prefix')
map['firstname'] = event.get('firstname')
map['middlename'] = event.get('middlename')
map['prefix'] = event.get('prefix')
map['lastname'] = event.get('lastname')
map['suffix'] = event.get('suffix')
map['gender'] = event.get('gender')
map['dob'] = event.get('dob')
map['maritalstatus'] = event.get('maritalstatus')
map['ssn'] = event.get('ssn')
map['clientid'] = event.get('clientid')
map['customersegmentdescription'] = event.get('customersegmentdescription')
map['householdid'] = event.get('householdid')
map['primaryphone'] = event.get('primaryphone')
map['homephone'] = event.get('homephone')
map['workphone'] = event.get('workphone')
map['extension'] = event.get('extension')
map['mobilephone'] = event.get('mobilephone')
map['faxphone'] = event.get('faxphone')
map['primaryemail'] = event.get('primaryemail')
map['secondaryemail'] = event.get('secondaryemail')
map['licensenumber'] = event.get('licensenumber')
map['licensestate'] = event.get('licensestate')
map['membershipid'] = event.get('membershipid')
map['membershipflag'] = event.get('membershipflag')
map['hash_key'] = event.get('hash_key')
map['address'] ||= []
map['address'] << {'address1' => event.get('address1'), 'address2' => event.get('address2'), 'address3' => event.get('address3'), 'county' => event.get('county'), 'city' => event.get('city'), 'state' => event.get('state'), 'zipcode' => event.get('zipcode'), 'country' => event.get('country') , 'addresstype' => event.get('addresstype') }
map['policy'] ||= []
map['policy'] << {'policynumber' => event.get('policynumber'), 'policystatus' => event.get('policystatus'), 'associatedrole' => event.get('associatedrole') }
map['agent'] ||= []
map['agent'] << {'agentid' => event.get('agentid'), 'agentname' => event.get('agentname'), 'agenttype' => event.get('agenttype'), 'agentcontactnumber' => event.get('agentcontactnumber'), 'agencyname' => event.get('agencyname'), 'agentnumber' => event.get('agentnumber') }
event.cancel()
"
push_previous_map_as_event => true
timeout => 3
aggregate_maps_path => "C:/Official/.aggregate_maps"
# end_of_task => true
inactivity_timeout => 3
map_action => "create_or_update"
push_map_as_event_on_timeout => true
timeout_code => "event.set('state', 'timeout')"
timeout_tags => ["aggregate_timeout"]
timeout_task_id_field => "message_failed"
}
}
output {
mongodb {
uri => "mongodb://localhost:27017/"
database => "STAGING2"
collection => "SPL_STAGING2"
isodate => true
}
}
Thanks in Advance!
Arun.