Need help to get logstash output in following format

Hi,
I am struggling to get bellow format

"tch_id" : 201,    
"rosterentry": [
                    {
                        "rolerosterid": 100,
                        "rosterdaterange": [
                            {
                                "rolerosterstartdate": "2020-07-01-00:00",
                                "rolerosterenddate": "2022-08-01-00:00"
                            },
                            {
                                "rolerosterstartdate": "2021-07-02-00:00",
                                "rolerosterenddate": "2022-08-02-00:00"
                            }
                        ]
                    },
                    {
                        "rolerosterid": 101,
                        "rosterdaterange": [
                            {
                                "rolerosterstartdate": "2019-07-03-00:00",
                                "rolerosterenddate": "2019-08-03-00:00"
                            }
                        ]
                    }
                ]
Here Database setup

create table rosterentry (rolerosterid number,rolerosterstartdate date,rolerosterenddate date,teacher_ids integer REFERENCES teacher_profile(teacher_id)); insert into rosterentry (rolerosterid,rolerosterstartdate,rolerosterenddate,teacher_ids)values (100,TO_DATE('2020-07-01-00:00','yyyy-MM-dd-hh24:mi'),TO_DATE('2022-08-01-00:00','yyyy-MM-dd-hh24:mi'),201); insert into rosterentry (rolerosterid,rolerosterstartdate,rolerosterenddate,teacher_ids)values (100,TO_DATE('2020-07-02-00:00','yyyy-MM-dd-hh24:mi'),TO_DATE('2022-08-02-00:00','yyyy-MM-dd-hh24:mi'),201);

insert into rosterentry (rolerosterid,rolerosterstartdate,rolerosterenddate,teacher_ids)values (101,TO_DATE('2019-07-03-00:00','yyyy-MM-dd-hh24:mi'),TO_DATE('2019-08-03-00:00','yyyy-MM-dd-hh24:mi'),201); create table teacher_profile (teacher_id integer primary key,name varchar2(50),social_category varchar2(50),sch_id integer references mst_school(sch_id),startdate TIMESTAMP, update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL); insert into teacher_profile (teacher_id,name,social_category,sch_id,startdate) values(201,'aaa','art',1, TO_DATE('2020-07-01-00:00','yyyy-MM-dd-hh24:mi')); --yyyy-MM-dd-HH:mm insert into teacher_profile (teacher_id,name,social_category,sch_id,startdate) values(202,'bbb','math',1, TO_DATE('2020-07-01-00:00','yyyy-MM-dd-hh24:mi')); insert into teacher_profile (teacher_id,name,social_category,sch_id,startdate) values(203,'ccc','phy',2,TO_DATE('2020-10-22-00:00','yyyy-MM-dd-hh24:mi'));

Config file

input {
jdbc {
	jdbc_connection_string => "jdbc:oracle:thin:@*****:1521/DB19C"
	jdbc_driver_library => "/ojdbc8-19.3.0.0.jar"
        jdbc_user => "*****"
        jdbc_password => "*****"
        jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
	schedule => "* * * * *"		
	statement => "select 
			 tch.teacher_id  tch_id, 
			 tch.name  tch_name,
             		 LISTAGG(DISTINCT c.phone_no, ',') WITHIN GROUP (ORDER BY c.phone_no) AS phone_no,
			 LISTAGG(DISTINCT r.rolerosterid, ',') WITHIN GROUP (ORDER BY r.rolerosterid) rolerosterid,
			 LISTAGG(DISTINCT (r.rolerosterid||','||r.rolerosterstartdate||',' ||r.rolerosterenddate), '|') WITHIN GROUP (ORDER BY r.rolerosterid) rosterdaterange
			from teacher_profile tch
			LEFT JOIN rosterentry r on tch.teacher_id = r.teacher_ids 
          		group by tch.teacher_id, s.sch_id,tch.name"			
			
			tracking_column_type => "numeric"
			
			jdbc_paging_enabled => true
			jdbc_fetch_size => "500"
			charset => "UTF-8"
			codec => json
			tracking_column => sch_id
			last_run_metadata_path => "/home/weblogic/Desktop/.sch_id_tracker_file"
			tags=> "table1"
    }

}
filter {

mutate {
        split => { "phone_no" => "," }
	split => { "rolerosterid" => "," }

}


		aggregate {
			task_id => "%{sch_id}"
			code => "
			map['tch_id'] = event.get('tch_id')
			map['rosterentry'] ||= [] 
			map['rosterentry'] << {
				'rolerosterid' => event.get('rolerosterid'),
				'rosterdaterange'=> event.get('rosterdaterange')
				}
			

			event.cancel()
			"
		
		timeout_tags => ["aggregate"]
		push_previous_map_as_event => true
		timeout => 3
		
		} 
}

output {
	elasticsearch {
	 document_id => "%{sch_id}"
	 index => "school_index"

	}
}

Any help will be appreciated.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.