Log stash not loading exact number of records in elasticsearch and on every hit results are changing

Problem statement : Logstash is not loading all records from Database to elasticsearch correctly and everytime I hit same api gets different results (However sometimes correct but changes on every hit and shows only subset of records under salutations nested field)
http://localhost:9200/staffsalutation/_search
I am observing a weird behaviour of logstash logstash-7.8.0 while loading records from 2 tables with query and configuration as below

Query :
select s.update_time, s.staff_id as staff_id, birth_date, first_name, last_name, gender, hire_date,
st.title AS title_nm, st.from_date AS title_frm_dt, st.to_date AS title_to_dt
from staff s
LEFT JOIN salutation st ON s.staff_id = st.staff_id
order by s.update_time

    input {
    	
    	jdbc {

    		jdbc_connection_string => "jdbc:postgresql://localhost:5432/postgres"
    		jdbc_driver_library => "C:\\Users\\NS\\.m2\\repository\\org\\postgresql\\postgresql\\42.2.11\\postgresql-42.2.11.jar"
            jdbc_user => "postgres"
            jdbc_password => "postgres"
            jdbc_driver_class => "org.postgresql.Driver"
            schedule => "* * * * *"		
    		statement => "select  e.update_time, e.emp_no as staff_id, birth_date, first_name, last_name, gender, hire_date, t.title AS title_nm, t.from_date AS title_frm_dt, t.to_date AS title_to_dt from employees e LEFT JOIN titles t ON e.emp_no  = t.emp_no  order by e.update_time"

    		add_field => { "doctype" => "employee" }
    		tracking_column_type => "timestamp"
    		use_column_value =>true
    		tracking_column => update_time
    		
    		jdbc_fetch_size => "50000"
    	}

    }
    filter {
    aggregate {
    		task_id => "%{staff_id}"
    			code => "
    				map['staff_id'] = event.get('staff_id')
    				map['birth_date'] = event.get('birth_date')
    				map['first_name'] = event.get('first_name')
    				map['last_name'] = event.get('last_name')
    				map['gender'] = event.get('gender')
    				map['hire_date'] = event.get('hire_date')
    				map['salutations'] ||= []
    				map['salutations'] << {
    				'title_nm' => event.get('title_nm'),'title_frm_dt' => event.get('title_frm_dt'),
    				'title_to_dt' => event.get('title_to_dt')
    				}
    				event.cancel()
    			"
    		push_previous_map_as_event => true
    		timeout => 30
    		}
    }
    output {
    	elasticsearch {
    	document_id => "%{staff_id}"
    	index => "staffsalutation"
    	}
    	file {
    	path => "test.log"  
    	codec => line
       }
    }

Found the solution !

  1. Need to use order by clause in query so that records are sorted by emp_no and
    logstash can search and aggregate dependant entities like titles (like One to many ).
from      employees e 
LEFT JOIN titles t ON e.emp_no = t.emp_no 
order by e.emp_no


2. Since aggregation is used here need to have single thread to process the record else
it will cause aggregation issues  (and that is where the random results you will get on multiple call to search on index as per url above) . Though it looks to be a performance hit as only 1 worker thread will process records but it can be mitigated by invoking multiple logstash config file with heterogeneous set of records e.g. first 100 emp_no in one file and 2nd hundred in other so that logstash can execute them in parallel.
so execute like below 
logstash -f logstash_config.conf -w 1

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.