Logstash multilevel aggregation #logstash

Hi,
I am trying do a aggregation using plugins logstash-filter-aggregate (2.8.0). I extract data from relation database using plugin-jdbc.

Where a teachers has and multiple contact_details.

I would like this result:

 "teachers" : [
            {
              "tch_name" : "aaa",
              "social_cat" : "art",
              "tch_id" : 201,
	          "contact_details" : [
			    {
			        "phone_no": ["1111111111","2222222222"],
			        "email_id: [aaa@gmail.com,aaa2@gmail.com]
    		    }
               ]
            }
	]

Database setup

create table mst_school (sch_id integer primary key,udise_sch_code varchar2(50),school_name varchar2(50), update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL);
insert into mst_school (sch_id,udise_sch_code,school_name) values(1,'100','AESA');
insert into mst_school (sch_id,udise_sch_code,school_name) values(2,'200','PVP');

create table teacher_profile (teacher_id integer primary key,name varchar2(50),social_category varchar2(50),sch_id integer references mst_school(sch_id),startdate TIMESTAMP, update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL);

insert into teacher_profile (teacher_id,name,social_category,sch_id,startdate) values(201,'aaa','art',1, TO_DATE('2020-07-01-00:00','yyyy-MM-dd-hh24:mi')); --yyyy-MM-dd-HH:mm
insert into teacher_profile (teacher_id,name,social_category,sch_id,startdate) values(202,'bbb','math',1, TO_DATE('2020-07-01-00:00','yyyy-MM-dd-hh24:mi'));
insert into teacher_profile (teacher_id,name,social_category,sch_id,startdate) values(203,'ccc','phy',2,TO_DATE('2020-10-22-00:00','yyyy-MM-dd-hh24:mi'));

create table village (village_id integer primary key,village_name varchar2(50),sch_id integer references mst_school(sch_id), latitude number(12,6),longitude number(12,6), update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL);
insert into village (village_id,village_name,sch_id,latitude,longitude) values(500,'Pune',1, 18.5135, 73.7699);
insert into village (village_id,village_name,sch_id,latitude,longitude) values(600,'Mumbai',2,19.0760, 72.8777); 


create table contact_info (teacher_ids integer REFERENCES teacher_profile(teacher_id), phone_no varchar2(20), email_id varchar2(20), update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL);
insert into contact_info (teacher_ids,phone_no,email_id) values (201, '1111111111', 'aaa@mail.com');
insert into contact_info (teacher_ids,phone_no,email_id) values (201, '2222222222', 'aaa2@mail.com');
insert into contact_info (teacher_ids,phone_no,email_id) values (202, '3333333333', 'bbb@mail.com');
insert into contact_info (teacher_ids,phone_no,email_id) values (203, '4444444444', 'ccc@mail.com');

select concat(s.udise_sch_code ,tch.teacher_id) comp_id,
			    s.sch_id as sch_id, 
			    s.udise_sch_code as sch_code,
			    s.school_name as school_name,
                v.latitude as latitude,
                v.longitude as longitude,
			    tch.teacher_id as tch_id, 
			    tch.name as tch_name,
			    tch.social_category as social_cat,
                tch.startdate as startdate,
                c.phone_no as phone_no,
			    v.village_id as village_id,
			    v.village_name as village_name,
			    v.sch_id as vsch_id
			from mst_school s 
			LEFT JOIN teacher_profile tch on s.sch_id = tch.sch_id
			LEFT JOIN village v on s.sch_id = v.sch_id
            LEFT JOIN contact_info c on tch.teacher_id = c.teacher_ids;

.conf file

input {
jdbc {
	jdbc_connection_string => "jdbc:oracle:thin:@****:1521/DB19C"
	jdbc_driver_library => "/home/user/Downloads/ojdbc8-19.3.0.0.jar"
        jdbc_user => "****"
        jdbc_password => "****"
        jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
	schedule => "* * * * *"		
	statement => "select concat(s.udise_sch_code ,tch.teacher_id) comp_id,
			    s.sch_id as sch_id, 
			    s.udise_sch_code as sch_code,
			    s.school_name as school_name,
		        v.latitude as latitude,
		        v.longitude as longitude,
			    tch.teacher_id as tch_id, 
			    tch.name as tch_name,
			    tch.social_category as social_cat,
                to_char(tch.startdate,'yyyy-MM-dd-HH:mm') as startdate,
                c.phone_no as phone_no,
			    c.email_id as email_id,
			    v.village_id as village_id,
			    v.village_name as village_name,
			    v.sch_id as vsch_id
			from mst_school s 
			LEFT JOIN teacher_profile tch on s.sch_id = tch.sch_id
			LEFT JOIN village v on s.sch_id = v.sch_id
            LEFT JOIN contact_info c on tch.teacher_id = c.teacher_ids"			
			
			tracking_column_type => "numeric"
			
			jdbc_paging_enabled => true
			jdbc_fetch_size => "500"
			charset => "UTF-8"
			codec => json
			tracking_column => sch_id
			last_run_metadata_path => "/home/user/Desktop/.sch_id_tracker_file"
    }
}
filter {


		aggregate {
			task_id => "%{sch_id}"
			code => "
			map['comp_id'] = event.get('comp_id') 
			map['sch_id'] = event.get('sch_id') 
			map['sch_code'] = event.get('sch_code') 
			map['teachers'] ||= []
			map['teachers'] << {
					    'tch_id' => event.get('tch_id'),
					    'tch_name' => event.get('tch_name'),
					    'social_cat' => event.get('social_cat')
					    # Contact details
					}
			 		
			event.cancel()
			"
		
		timeout_tags => ["aggregate"]
		push_previous_map_as_event => true
		timeout => 3
		
		} 
}

output {
	elasticsearch {
	 document_id => "%{sch_id}"
	 index => "school_index"

	}
}

How can I improve my conf file to get desired output.

Resolved

input {
jdbc {
	jdbc_connection_string => "jdbc:oracle:thin:@*****:1521/DB19C"
	jdbc_driver_library => "*****/ojdbc8-19.3.0.0.jar"
        jdbc_user => "*****"
        jdbc_password => "*****"
        jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
    	schedule => "* * * * *"		
    	statement => "select concat(s.udise_sch_code ,tch.teacher_id) comp_id,
			    s.sch_id as sch_id, 
			    s.udise_sch_code as sch_code,
			    s.school_name as school_name,
           	    v.latitude as latitude,
		        v.longitude as longitude,
			    tch.teacher_id as tch_id, 
			    tch.name as tch_name,
			    tch.social_category as social_cat,
          	    to_char(tch.startdate,'yyyy-MM-dd-HH:mm') as startdate,
          	    LISTAGG(phone_no, ',') WITHIN GROUP (ORDER BY phone_no) AS phone_no,
          	    LISTAGG(email_id, ',')  WITHIN GROUP (ORDER BY email_id) AS email_id,
			    v.village_id as village_id,
			    v.village_name as village_name,
			    v.sch_id as vsch_id
			from mst_school s 
			LEFT JOIN teacher_profile tch on s.sch_id = tch.sch_id
			LEFT JOIN village v on s.sch_id = v.sch_id
            LEFT JOIN contact_info c on tch.teacher_id = c.teacher_ids 
            group by c.teacher_ids,concat(s.udise_sch_code ,tch.teacher_id), s.sch_id, s.udise_sch_code, s.school_name, v.latitude, 
            v.longitude, tch.teacher_id, tch.name, tch.social_category, to_char(tch.startdate,'yyyy-MM-dd-HH:mm'), 
            v.village_id, v.village_name, v.sch_id"			
			
			tracking_column_type => "numeric"
			
			jdbc_paging_enabled => true
			jdbc_fetch_size => "500"
			charset => "UTF-8"
			codec => json
			tracking_column => sch_id
			last_run_metadata_path => "/home/weblogic/Desktop/.sch_id_tracker_file"
			tags=> "table1"
    }

}
filter {

mutate {
        split => { "phone_no" => "," }
   	    split => { "email_id" => "," }
}


		aggregate {
			task_id => "%{sch_id}"
			code => "
			map['comp_id'] = event.get('comp_id') 
			map['sch_id'] = event.get('sch_id') 
			map['sch_code'] = event.get('sch_code')
			
			map['teachers'] ||= [] 
			map['teachers'] << {
					    'tch_id' => event.get('tch_id'),
					    'tch_name' => event.get('tch_name'),
					    'social_cat' => event.get('social_cat'),
					    'contact_details'=> {
						'phone_no' => event.get('phone_no'),
						'email_id' => event.get('email_id')
						}

			 		}

			map['village_ids'] ||= []
               		map['village_ids'] << event.get('village_id')

			map['location'] = {'lat' => event.get('latitude'),'lon' => event.get('longitude')}

			map['startdate'] ||= []
			map['startdate'] <<{ 'startdate' => event.get('startdate') }

			event.cancel()
			"
		
		timeout_tags => ["aggregate"]
		push_previous_map_as_event => true
		timeout => 3
		
		} 
}

output {
	elasticsearch {
	 document_id => "%{sch_id}"
	 index => "school_index"

	}
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.