Hi,
I am trying do a aggregation using plugins logstash-filter-aggregate (2.8.0). I extract data from relation database using plugin-jdbc.
Where a teachers has and multiple contact_details.
I would like this result:
"teachers" : [
{
"tch_name" : "aaa",
"social_cat" : "art",
"tch_id" : 201,
"contact_details" : [
{
"phone_no": ["1111111111","2222222222"],
"email_id: [aaa@gmail.com,aaa2@gmail.com]
}
]
}
]
Database setup
create table mst_school (sch_id integer primary key,udise_sch_code varchar2(50),school_name varchar2(50), update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL);
insert into mst_school (sch_id,udise_sch_code,school_name) values(1,'100','AESA');
insert into mst_school (sch_id,udise_sch_code,school_name) values(2,'200','PVP');
create table teacher_profile (teacher_id integer primary key,name varchar2(50),social_category varchar2(50),sch_id integer references mst_school(sch_id),startdate TIMESTAMP, update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL);
insert into teacher_profile (teacher_id,name,social_category,sch_id,startdate) values(201,'aaa','art',1, TO_DATE('2020-07-01-00:00','yyyy-MM-dd-hh24:mi')); --yyyy-MM-dd-HH:mm
insert into teacher_profile (teacher_id,name,social_category,sch_id,startdate) values(202,'bbb','math',1, TO_DATE('2020-07-01-00:00','yyyy-MM-dd-hh24:mi'));
insert into teacher_profile (teacher_id,name,social_category,sch_id,startdate) values(203,'ccc','phy',2,TO_DATE('2020-10-22-00:00','yyyy-MM-dd-hh24:mi'));
create table village (village_id integer primary key,village_name varchar2(50),sch_id integer references mst_school(sch_id), latitude number(12,6),longitude number(12,6), update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL);
insert into village (village_id,village_name,sch_id,latitude,longitude) values(500,'Pune',1, 18.5135, 73.7699);
insert into village (village_id,village_name,sch_id,latitude,longitude) values(600,'Mumbai',2,19.0760, 72.8777);
create table contact_info (teacher_ids integer REFERENCES teacher_profile(teacher_id), phone_no varchar2(20), email_id varchar2(20), update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL);
insert into contact_info (teacher_ids,phone_no,email_id) values (201, '1111111111', 'aaa@mail.com');
insert into contact_info (teacher_ids,phone_no,email_id) values (201, '2222222222', 'aaa2@mail.com');
insert into contact_info (teacher_ids,phone_no,email_id) values (202, '3333333333', 'bbb@mail.com');
insert into contact_info (teacher_ids,phone_no,email_id) values (203, '4444444444', 'ccc@mail.com');
select concat(s.udise_sch_code ,tch.teacher_id) comp_id,
s.sch_id as sch_id,
s.udise_sch_code as sch_code,
s.school_name as school_name,
v.latitude as latitude,
v.longitude as longitude,
tch.teacher_id as tch_id,
tch.name as tch_name,
tch.social_category as social_cat,
tch.startdate as startdate,
c.phone_no as phone_no,
v.village_id as village_id,
v.village_name as village_name,
v.sch_id as vsch_id
from mst_school s
LEFT JOIN teacher_profile tch on s.sch_id = tch.sch_id
LEFT JOIN village v on s.sch_id = v.sch_id
LEFT JOIN contact_info c on tch.teacher_id = c.teacher_ids;
.conf file
input {
jdbc {
jdbc_connection_string => "jdbc:oracle:thin:@****:1521/DB19C"
jdbc_driver_library => "/home/user/Downloads/ojdbc8-19.3.0.0.jar"
jdbc_user => "****"
jdbc_password => "****"
jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
schedule => "* * * * *"
statement => "select concat(s.udise_sch_code ,tch.teacher_id) comp_id,
s.sch_id as sch_id,
s.udise_sch_code as sch_code,
s.school_name as school_name,
v.latitude as latitude,
v.longitude as longitude,
tch.teacher_id as tch_id,
tch.name as tch_name,
tch.social_category as social_cat,
to_char(tch.startdate,'yyyy-MM-dd-HH:mm') as startdate,
c.phone_no as phone_no,
c.email_id as email_id,
v.village_id as village_id,
v.village_name as village_name,
v.sch_id as vsch_id
from mst_school s
LEFT JOIN teacher_profile tch on s.sch_id = tch.sch_id
LEFT JOIN village v on s.sch_id = v.sch_id
LEFT JOIN contact_info c on tch.teacher_id = c.teacher_ids"
tracking_column_type => "numeric"
jdbc_paging_enabled => true
jdbc_fetch_size => "500"
charset => "UTF-8"
codec => json
tracking_column => sch_id
last_run_metadata_path => "/home/user/Desktop/.sch_id_tracker_file"
}
}
filter {
aggregate {
task_id => "%{sch_id}"
code => "
map['comp_id'] = event.get('comp_id')
map['sch_id'] = event.get('sch_id')
map['sch_code'] = event.get('sch_code')
map['teachers'] ||= []
map['teachers'] << {
'tch_id' => event.get('tch_id'),
'tch_name' => event.get('tch_name'),
'social_cat' => event.get('social_cat')
# Contact details
}
event.cancel()
"
timeout_tags => ["aggregate"]
push_previous_map_as_event => true
timeout => 3
}
}
output {
elasticsearch {
document_id => "%{sch_id}"
index => "school_index"
}
}
How can I improve my conf file to get desired output.