Aggregate filter not working in logstash 5.4.2


(Arun Prakash) #1

Hi all,

Aggregate filter not working in logstash 5.4.2. I used latest aggregate filter plugin and my config file as follows.

input {
    jdbc {
        jdbc_connection_string => "jdbc:oracle:thin:@drssqlentrac_sc.aaa-acg.net:1521/orasliud"
        jdbc_user => "admin"
        jdbc_password => "admin"
        jdbc_driver_library => "C:\Official\ojdbc6-11.2.0.3.jar"
        jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
        statement => "SELECT    CI.FIRST_NAME||'_'||CI.LAST_NAME||'_'||CI.GENDER||'_'||TO_CHAR(CI.DATE_OF_BIRTH,'YYYYMMDD') AS message, CI.TITLE               AS prefix,    CI.FIRST_NAME          AS firstname,    CI.MIDDLE_NAME         AS middlename,    CI.LAST_NAME           AS lastname,    CI.TITLE_SUFFIX        AS suffix,    CI.GENDER              AS gender,    CI.DATE_OF_BIRTH       AS dob,    CI.MARITAL_STATUS      AS maritalstatus,    CI.SOCIAL_SECURITY_NUM AS ssn,    CI.REF_NUMBER          AS clientid,    NULL                   AS customersegmentdescription,    NULL                   AS householdid,    CASE      WHEN CIP.PRIMARY_FLAG = 'Y'      THEN PHONE_NUM    END AS primaryphone,    CASE      WHEN CIP.NUM_TYPE = 'Home'      THEN PHONE_NUM    END AS homephone,    CASE      WHEN CIP.NUM_TYPE = 'Office'      THEN PHONE_NUM    END           AS workphone,    CIP.EXTENSION AS extension,    CASE      WHEN CIP.NUM_TYPE = 'Cell'      THEN PHONE_NUM    END AS mobilephone,    CASE      WHEN CIP.NUM_TYPE = 'Fax'      THEN PHONE_NUM    END              AS faxphone,    CI.EMAIL_ADDRESS AS primaryemail,    NULL             AS secondaryemail,    regexp_replace(CO.ADDRESS_LINE1,'[^a-zA-Z0-9 ]+','') AS address1,    regexp_replace(CO.ADDRESS_LINE2,'[^a-zA-Z0-9 ]+','') AS address2,   regexp_replace(CO.ADDRESS_LINE3,'[^a-zA-Z0-9 ]+','') AS address3,    CO.COUNTY_ID    ||' - '    ||CO.COUNTY AS county,    CO.CITY     AS city,    CO.STATE_ID    ||' - '    ||CO.STATE            AS state,    CO.ZIPCODE            AS zipcode,    CO.COUNTRY            AS country,    CPA.ADDRESS_TYPE      AS addresstype,    CI.DL_NUMBER          AS licensenumber,    CI.DL_STATE_ID        AS licensestate,    CPS.MEMBERSHIP_NUMBER AS membershipid,    NULL                  AS membershipflag,    CPT.POLICY_NUM_PREFIX    ||CPT.POLICY_NUM AS policynumber,    CPT.STATUS       AS policystatus,    NULL             AS associatedrole,    CPY.REF_NUMBER   AS agentid,    CPY.FULL_NAME    AS agentname,    NULL             AS agentype,    NULL             AS agentcontactnumber,    CPY.FULL_NAME    AS agencyname,    NULL             AS agentnumber  FROM CO_INSURED CI  LEFT OUTER JOIN CO_INSURED_PHONE_NUM CIP  ON CI.CO_ID = CIP.CO_ID  LEFT OUTER JOIN CO_ADDRESS CO  ON CI.CO_ID = CO.CO_ID  LEFT OUTER JOIN CO_PARTY_ADDRESS CPA  ON CI.CO_ID = CPA.CO_ID  LEFT OUTER JOIN CO_POLICY_SNAPSHOT CPS  ON CI.CO_ID = CPS.CO_ID  LEFT OUTER JOIN CO_POLICY_TERM CPT  ON CI.CO_ID = CPT.CO_ID  LEFT OUTER JOIN CO_PARTY CPY  ON CI.CO_ID = CPY.CO_ID"
  add_field => {
	"batch_id" => "SPL%{+YYYYMMdd}"
  }
    }
}
filter {
ruby {
	code =>	"fieldArray = event.get('firstname').to_s.split(' ')
			a = Array.new
    fieldArray.map do |word|
        a.push(word.capitalize)
    end
		 event.set('firstname',a.join(' '))"
}

ruby {
		code => "fieldArray2 = event.get('lastname').to_s.split(' ')
			b = Array.new
    fieldArray2.map do |word2|
        b.push(word2.capitalize)
    end
		 event.set('lastname',b.join(' '))"
}
     aggregate {
       task_id => "%{message}"
       code => "
		 map['message'] = event.get('message')
		 map['prefix'] = event.get('prefix')
		 map['firstname'] = event.get('firstname')
		 map['middlename'] = event.get('middlename')
		 map['prefix'] = event.get('prefix')
		 map['lastname'] = event.get('lastname')
		 map['suffix'] = event.get('suffix')
		 map['gender'] = event.get('gender')
		 map['dob'] = event.get('dob')
		 map['maritalstatus'] = event.get('maritalstatus')
		 map['ssn'] = event.get('ssn')
		 map['clientid'] = event.get('clientid')
		 map['customersegmentdescription'] = event.get('customersegmentdescription')
		 map['householdid'] = event.get('householdid')
		 map['primaryphone'] = event.get('primaryphone')
		 map['homephone'] = event.get('homephone')
		 map['workphone'] = event.get('workphone')
		 map['extension'] = event.get('extension')
		 map['mobilephone'] = event.get('mobilephone')
		 map['faxphone'] = event.get('faxphone')
		 map['primaryemail'] = event.get('primaryemail')
		 map['secondaryemail'] = event.get('secondaryemail')
		 map['licensenumber'] = event.get('licensenumber')
		 map['licensestate'] = event.get('licensestate')
		 map['membershipid'] = event.get('membershipid')
		 map['membershipflag'] = event.get('membershipflag')
		 map['hash_key'] = event.get('hash_key')
         map['address'] ||= []
         map['address'] << {'address1' => event.get('address1'), 'address2' => event.get('address2'), 'address3' => event.get('address3'), 'county' => event.get('county'), 'city' => event.get('city'), 'state' => event.get('state'), 'zipcode' => event.get('zipcode'), 'country' => event.get('country') , 'addresstype' => event.get('addresstype') }
         map['policy'] ||= []
         map['policy'] << {'policynumber' => event.get('policynumber'), 'policystatus' => event.get('policystatus'), 'associatedrole' => event.get('associatedrole') }
         map['agent'] ||= []
         map['agent'] << {'agentid' => event.get('agentid'), 'agentname' => event.get('agentname'), 'agenttype' => event.get('agenttype'), 'agentcontactnumber' => event.get('agentcontactnumber'), 'agencyname' => event.get('agencyname'), 'agentnumber' => event.get('agentnumber') }
		 event.cancel()
       "
       push_previous_map_as_event => true
       timeout => 3
	   aggregate_maps_path => "C:/Official/.aggregate_maps"
#	   end_of_task => true
	   inactivity_timeout  => 3
	   map_action => "create_or_update"
	   push_map_as_event_on_timeout => true
	   timeout_code => "event.set('state', 'timeout')"
	   timeout_tags => ["aggregate_timeout"]
	   timeout_task_id_field => "message_failed"
     }
}
output {
  mongodb {
    uri => "mongodb://localhost:27017/"
    database => "STAGING2"
    collection => "SPL_STAGING2"
	isodate => true
  }
}

Thanks in Advance!
Arun.


Aggregate filter not working
(Mark Walkom) #2

Why do you think it's not working?


(system) #3

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.