Problem statement : Logstash is not loading all records from Database to elasticsearch correctly and everytime I hit same api gets different results (However sometimes correct but changes on every hit and shows only subset of records under salutations nested field)
http://localhost:9200/staffsalutation/_search
I am observing a weird behaviour of logstash logstash-7.8.0 while loading records from 2 tables with query and configuration as below
Query :
select s.update_time, s.staff_id as staff_id, birth_date, first_name, last_name, gender, hire_date,
st.title AS title_nm, st.from_date AS title_frm_dt, st.to_date AS title_to_dt
from staff s
LEFT JOIN salutation st ON s.staff_id = st.staff_id
order by s.update_time
input {
jdbc {
jdbc_connection_string => "jdbc:postgresql://localhost:5432/postgres"
jdbc_driver_library => "C:\\Users\\NS\\.m2\\repository\\org\\postgresql\\postgresql\\42.2.11\\postgresql-42.2.11.jar"
jdbc_user => "postgres"
jdbc_password => "postgres"
jdbc_driver_class => "org.postgresql.Driver"
schedule => "* * * * *"
statement => "select e.update_time, e.emp_no as staff_id, birth_date, first_name, last_name, gender, hire_date, t.title AS title_nm, t.from_date AS title_frm_dt, t.to_date AS title_to_dt from employees e LEFT JOIN titles t ON e.emp_no = t.emp_no order by e.update_time"
add_field => { "doctype" => "employee" }
tracking_column_type => "timestamp"
use_column_value =>true
tracking_column => update_time
jdbc_fetch_size => "50000"
}
}
filter {
aggregate {
task_id => "%{staff_id}"
code => "
map['staff_id'] = event.get('staff_id')
map['birth_date'] = event.get('birth_date')
map['first_name'] = event.get('first_name')
map['last_name'] = event.get('last_name')
map['gender'] = event.get('gender')
map['hire_date'] = event.get('hire_date')
map['salutations'] ||= []
map['salutations'] << {
'title_nm' => event.get('title_nm'),'title_frm_dt' => event.get('title_frm_dt'),
'title_to_dt' => event.get('title_to_dt')
}
event.cancel()
"
push_previous_map_as_event => true
timeout => 30
}
}
output {
elasticsearch {
document_id => "%{staff_id}"
index => "staffsalutation"
}
file {
path => "test.log"
codec => line
}
}