I have implemented logstash with postgres to send my restaurant and menu items to Elasticsearch. I have applied paging to fetch the data from postgres and also have jdbc streaming.
If i run the logstash with jdbc streaming query with less paging size like 10, 20, or 100, I could see the documents in Elasticsearch quickly.
If I run the logstash with jdbc streaming query with large page size like 1000, 2000, I could not see the documents in Elasticsearch quickly.
Please find the below my logstash configuration. The same configuration with large page size have better performance with oracle database.
input { jdbc { jdbc_driver_library => "" jdbc_driver_class => "org.postgresql.Driver" jdbc_connection_string => "jdbc:postgresql://jahez.c5mmwuonnd6t.eu-central-1.rds.amazonaws.com:5432/jahezdb" jdbc_user => username jdbc_password => password use_column_value => true tracking_column => restaurant_id tracking_column_type => "numeric" schedule => "* * * * *" statement => "select * from (select dd.restaurant_id as restaurant_id, r.RESTAURANT_NAME, --MMC.menu_class, r.SLOGAN, r.CURRENCY_CODE, dd.BRANCH_ID, dd.FOODICS_HID, dd.BRANCH_NAME, dd.created, dd.CITY_ID, case when (r.restaurant_id = rcdo.restaurant_id and dd.city_id = rcdo.city_id) then 1 else 100 end display_rest_city_order, case when (r.hidden = '0' and dd.hidden is false) then 'Y' else 'N' end as show_restaurant, case when dd.FOODICS_HID is not null then MMC.menu_class else 'JAHEZ' end menu_class, IS_RESTAURANT_AREA_CLOSED(st_x(dd.location), st_y(dd.location)) as is_area_closed, CASE WHEN dd.delivery_service_type_code = 'O' THEN dd.delivery_charge WHEN dd.delivery_service_type_code = 'T' THEN rc.DELIVERY_CHARGE ELSE NULL END delivery_charges, dd.HOME_DELIVERY_AVAILABLE as delivery_available, --MOBILE_PKG$IS_BRANCH_OPEN_F(dd.BRANCH_ID::bigint) as is_branch_open, dd.city_name, st_x(dd.LOCATION) longitude, st_y(dd.LOCATION) latitude, 8.0 as rating, --dd.branchDistance as distance, (SELECT string_agg(cs.cuisine_name, ', ' ORDER BY cs.cuisine_id) FROM cuisine cs, restaurant_cuisine rz WHERE rz.restaurant_id = dd.restaurant_id AND cs.cuisine_id = rz.cuisine_id ) cuisine_name, (SELECT string_agg(cs.cuisine_name_ar, ', ' ORDER BY cs.cuisine_id ) FROM cuisine cs, restaurant_cuisine rz WHERE rz.restaurant_id = dd.restaurant_id AND cs.cuisine_id = rz.cuisine_id ) cuisine_name_ar, (SELECT string_agg(cs.cuisine_id::text, ',' ORDER BY cs.cuisine_id) FROM cuisine cs, restaurant_cuisine rz WHERE rz.restaurant_id = dd.restaurant_id AND cs.cuisine_id = rz.cuisine_id ) cuisine_ids, --MOBILE_PKG$GET_BRANCH_TIMING_F(dd.BRANCH_ID) as branch_timing, 1 as votes, case when r.logo is not null then 'Y' else null end as LOGO, CASE WHEN dd.delivery_service_type_code = 'O' THEN 'F' WHEN dd.delivery_service_type_code = 'T' THEN rc.delivery_charge_type_code ELSE NULL END delivery_charge_type_code, rc.vat_enabled as vat_enabled, CASE WHEN dd.delivery_service_type_code = 'O' THEN 'F' WHEN dd.delivery_service_type_code = 'T' THEN rc.service_charge_type_code ELSE NULL END service_charge_type_code from ( select d.* FROM ( select /*+ first_rows LEADING(c) USE_NL(c b) INDEX(b BRANCH_LOCATION_SPIX)*/ b.BRANCH_ID, b.BRANCH_NAME, b.FOODICS_HID, to_char(b.created, 'DD-MM-YYYY') as created, b.RESTAURANT_ID, b.CITY_ID, b.DELIVERY_CHARGE, b.HOME_DELIVERY_AVAILABLE, b.location, b.DELIVERY_SERVICE_TYPE_CODE, c.city_name, b.hidden --mdsys.sdo_nn_distance(1) branchDistance from branch b, city c where b.city_id = c.city_id --and b.HIDDEN = 0 and b.SERVICE_CHARGE is not null and b.DELIVERY_CHARGE is not null --and IS_RESTAURANT_AREA_CLOSED (b.location.sdo_point.x,b.location.sdo_point.y)='N' ) d ) dd left join restaurant r on r.RESTAURANT_ID = dd.RESTAURANT_ID left join MASTER_MENU_CLASS MMC on r.MASTER_MENU_CLASS_ID = MMC.ID left join restaurant_charge rc on dd.RESTAURANT_ID = rc.RESTAURANT_ID left join app_user au on r.USER_ID = au.USER_ID and au.USER_TYPE_CODE = 'R' left join restaurant_city_display_order rcdo on dd.RESTAURANT_ID = rcdo.restaurant_id and dd.city_id = rcdo.city_id where r.restaurant_id IN (SELECT rz.restaurant_id from restaurant_cuisine rz, cuisine cs WHERE cs.cuisine_id = rz.cuisine_id) ORDER BY dd.restaurant_id) as T1 where T1.restaurant_id >= :sql_last_value LIMIT 10" codec => plain { charset=>"UTF-8" } } } filter { mutate { split => {"cuisine_ids" => ","} } mutate { convert => {"cuisine_ids" => "integer"} } if [latitude] and [longitude] { mutate { convert => { "latitude" => "float" } convert => { "longitude" => "float" } add_field => ["[geoip][location]", "%{latitude},%{longitude}"] } } jdbc_streaming { jdbc_driver_library => "/usr/share/logstash/logstash-core/lib/jars/postgresql-42.2.18.jar" jdbc_driver_class => "org.postgresql.Driver" jdbc_connection_string => "jdbc:postgresql://jahez.c5mmwuonnd6t.eu-central-1.rds.amazonaws.com:5432/jahezdb" jdbc_user => username jdbc_password => password statement => "SELECT A.menu_item_id, A.menu_item_name, c.price, e.size_desc, f.currency_code, m.mark_name, case when A.image is not null then 'Y' else 'N' end as menu_logo FROM menu_item A inner join menu_item_variant c on A.menu_item_id = c.menu_item_id inner join menu_item_variant_type d on c.menu_item_variant_type_id = d.menu_item_variant_type_id inner join item_size e on c.size_id = e.size_id inner join restaurant f on A.restaurant_id = f.restaurant_id left outer join mark m on a.mark_id = m.mark_id WHERE d.IS_HIDDEN = 'false' AND c.menu_item_variant_id = (SELECT MIN(menu_item_variant_id) FROM menu_item_variant WHERE menu_item_id= A.menu_item_id and DELETED = 'N' ) and A.active ='Y' and A.restaurant_id = :restaurant_id and (','||A.hidden_branch_ids||',' not like '%,'||:branch_id||',%' or A.hidden_branch_ids is null) and is_menu_item_available(A.menu_item_id::double precision, 'Y') = 'Y'" parameters => { "restaurant_id" => "restaurant_id" "branch_id" => "branch_id"} target => "menu_items" } } output { elasticsearch { hosts => "localhost:9200" index => "logstash-restaurant-index" document_id => "%{restaurant_id}_%{branch_id}" action => "update" doc_as_upsert => true ilm_enabled => false } stdout { codec => "rubydebug"} }
How can i see the jdbc streaming query log, I did not get the log about streaming filter.