Postgres with Elastic search issue

I have implemented logstash with postgres to send my restaurant and menu items to elastic search. I have applied paging to fetch the data from postgres and also have jdbc streaming.

If i run the logstash with jdbc streaming query with less paging size like 10, 20, or 100, I could see the documents in elastic search quickly.

If I run the logstash with jdbc streaming query with large page size like 1000, 2000, I could not see the documents in elastic search quickly.

Please find the below my logstash configuration. The same configuration with large page size have better performance with oracle database.

input {
  jdbc {
    jdbc_driver_library => ""
    jdbc_driver_class => "org.postgresql.Driver"
    jdbc_connection_string => "jdbc:postgresql://jahez.c5mmwuonnd6t.eu-central-1.rds.amazonaws.com:5432/jahezdb"
    jdbc_user => username
    jdbc_password => password
    use_column_value => true
    tracking_column => restaurant_id
    tracking_column_type => "numeric"
    schedule => "* * * * *"
    statement => "select * from (select dd.restaurant_id as restaurant_id,
                   r.RESTAURANT_NAME,
                   --MMC.menu_class,
                   r.SLOGAN,
                   r.CURRENCY_CODE,
                   dd.BRANCH_ID,
                   dd.FOODICS_HID,
                   dd.BRANCH_NAME,
                   dd.created,
                   dd.CITY_ID,
                   case
                       when (r.restaurant_id = rcdo.restaurant_id and dd.city_id = rcdo.city_id) then 1
                       else 100 end display_rest_city_order,
                   case when (r.hidden = '0' and dd.hidden is false) then 'Y' else 'N' end as show_restaurant,
                   case when dd.FOODICS_HID is not null then MMC.menu_class else 'JAHEZ' end menu_class,
                   IS_RESTAURANT_AREA_CLOSED(st_x(dd.location), st_y(dd.location)) as is_area_closed,
                   CASE
                       WHEN dd.delivery_service_type_code = 'O' THEN dd.delivery_charge
                       WHEN dd.delivery_service_type_code = 'T' THEN rc.DELIVERY_CHARGE
                       ELSE NULL END delivery_charges,
                   dd.HOME_DELIVERY_AVAILABLE as delivery_available,
                   --MOBILE_PKG$IS_BRANCH_OPEN_F(dd.BRANCH_ID::bigint) as is_branch_open,
                   dd.city_name,
                   st_x(dd.LOCATION) longitude,
                   st_y(dd.LOCATION) latitude,
                   8.0 as rating,
                   --dd.branchDistance as distance,
                   (SELECT string_agg(cs.cuisine_name, ', ' ORDER BY cs.cuisine_id)
                    FROM cuisine cs,
                         restaurant_cuisine rz
                    WHERE rz.restaurant_id = dd.restaurant_id
                      AND cs.cuisine_id = rz.cuisine_id
                   ) cuisine_name,
                   (SELECT string_agg(cs.cuisine_name_ar, ', ' ORDER BY cs.cuisine_id )
                    FROM cuisine cs,
                         restaurant_cuisine rz
                    WHERE rz.restaurant_id = dd.restaurant_id
                      AND cs.cuisine_id = rz.cuisine_id
                   ) cuisine_name_ar,
                   (SELECT string_agg(cs.cuisine_id::text, ',' ORDER BY cs.cuisine_id)
                    FROM cuisine cs,
                         restaurant_cuisine rz
                    WHERE rz.restaurant_id = dd.restaurant_id
                      AND cs.cuisine_id = rz.cuisine_id
                   ) cuisine_ids,
                   --MOBILE_PKG$GET_BRANCH_TIMING_F(dd.BRANCH_ID) as branch_timing,
                   1 as votes,
                   case when r.logo is not null then 'Y' else null end as LOGO,
                   CASE
                       WHEN dd.delivery_service_type_code = 'O' THEN 'F'
                       WHEN dd.delivery_service_type_code = 'T' THEN rc.delivery_charge_type_code
                       ELSE NULL END delivery_charge_type_code,
                   rc.vat_enabled as vat_enabled,
                   CASE
                       WHEN dd.delivery_service_type_code = 'O' THEN 'F'
                       WHEN dd.delivery_service_type_code = 'T' THEN rc.service_charge_type_code
                       ELSE NULL END service_charge_type_code
            from (
                     select d.*
                     FROM (
                              select
                                  /*+ first_rows LEADING(c) USE_NL(c b) INDEX(b BRANCH_LOCATION_SPIX)*/
                                  b.BRANCH_ID,
                                  b.BRANCH_NAME,
                                  b.FOODICS_HID,
                                  to_char(b.created, 'DD-MM-YYYY') as created,
                                  b.RESTAURANT_ID,
                                  b.CITY_ID,
                                  b.DELIVERY_CHARGE,
                                  b.HOME_DELIVERY_AVAILABLE,
                                  b.location,
                                  b.DELIVERY_SERVICE_TYPE_CODE,
                                  c.city_name,
                                  b.hidden
                                  --mdsys.sdo_nn_distance(1) branchDistance
                              from branch b,
                                   city c
                              where b.city_id = c.city_id
                                --and b.HIDDEN = 0
                                and b.SERVICE_CHARGE is not null
                                and b.DELIVERY_CHARGE is not null
                              --and IS_RESTAURANT_AREA_CLOSED (b.location.sdo_point.x,b.location.sdo_point.y)='N'
                          ) d
                 ) dd
                     left join restaurant r on r.RESTAURANT_ID = dd.RESTAURANT_ID
                     left join MASTER_MENU_CLASS MMC on r.MASTER_MENU_CLASS_ID = MMC.ID
                     left join restaurant_charge rc on dd.RESTAURANT_ID = rc.RESTAURANT_ID
                     left join app_user au on r.USER_ID = au.USER_ID and au.USER_TYPE_CODE = 'R'
                     left join restaurant_city_display_order rcdo
                               on dd.RESTAURANT_ID = rcdo.restaurant_id and dd.city_id = rcdo.city_id
	            where r.restaurant_id IN (SELECT rz.restaurant_id
                                      from restaurant_cuisine rz,
                                           cuisine cs
                                      WHERE cs.cuisine_id = rz.cuisine_id) ORDER BY dd.restaurant_id) as T1
		where T1.restaurant_id >= :sql_last_value LIMIT 10"

	  codec => plain { charset=>"UTF-8" }
  }
}


filter {
	mutate {
		split => {"cuisine_ids" => ","}
	}

	mutate {
		convert => {"cuisine_ids" => "integer"}
	}

	if [latitude] and [longitude] {
		mutate {
			convert => { "latitude" => "float" }
                    convert => { "longitude" => "float" }
			add_field => ["[geoip][location]", "%{latitude},%{longitude}"]
		}
	}

	jdbc_streaming {
		 jdbc_driver_library => "/usr/share/logstash/logstash-core/lib/jars/postgresql-42.2.18.jar"
	         jdbc_driver_class => "org.postgresql.Driver"
   		 jdbc_connection_string => "jdbc:postgresql://jahez.c5mmwuonnd6t.eu-central-1.rds.amazonaws.com:5432/jahezdb"
   		 jdbc_user => username
		 jdbc_password => password
		 statement => "SELECT A.menu_item_id,
                                   A.menu_item_name,
                                   c.price,
                                   e.size_desc,
                                   f.currency_code,
                 									m.mark_name,																																																																																																																																																																																														                                       case when A.image is not null then 'Y' else 'N' end as menu_logo
                          FROM   menu_item A
                          inner join menu_item_variant c on A.menu_item_id = c.menu_item_id
                               inner join menu_item_variant_type d on c.menu_item_variant_type_id = d.menu_item_variant_type_id
                               inner join item_size e on c.size_id  = e.size_id
                               inner join restaurant f on A.restaurant_id = f.restaurant_id
                               left outer join mark m on a.mark_id = m.mark_id
                          WHERE d.IS_HIDDEN = 'false'
                              AND c.menu_item_variant_id = (SELECT MIN(menu_item_variant_id) FROM menu_item_variant WHERE menu_item_id= A.menu_item_id and DELETED = 'N' )
                              and A.active ='Y' and A.restaurant_id = :restaurant_id
                              and (','||A.hidden_branch_ids||',' not like '%,'||:branch_id||',%' or A.hidden_branch_ids is null)
                              and is_menu_item_available(A.menu_item_id::double precision, 'Y') = 'Y'"

		parameters => { "restaurant_id" => "restaurant_id" "branch_id" => "branch_id"}
	        target => "menu_items"
	}
}																																																																																																																														

output {																																																																																																																																																																	
   elasticsearch {
                  hosts => "localhost:9200"
                  index => "logstash-restaurant-index"
                  document_id => "%{restaurant_id}_%{branch_id}"
                  action => "update"
                  doc_as_upsert => true
                  ilm_enabled => false
	}
	stdout { codec =>  "rubydebug"}
}

How can i see the jdbc streaming query log, I did not get the log about streaming filter.

I suggest you create the view for your long and complex query..
Then don't forget to check your composite index to make the query faster.
For better understanding of your query, use explain to see the cost of the queries.

So, I checked the query and It seems issue with query, now i am written dynamic query in function but it is giving me error.

NOTICE: operator is not unique: unknown - unknown 42725

Please find below my query

-- FUNCTION: jahezdbapp.mobile_pkg$get_branch_timing_fn(double precision)

-- DROP FUNCTION jahezdbapp."mobile_pkg$get_branch_timing_fn"(double precision);

CREATE OR REPLACE FUNCTION jahezdbapp."mobile_pkg$get_branch_timing_fn"(
i_branch_id double precision)
RETURNS text
LANGUAGE 'plpgsql'

COST 100
VOLATILE 

AS BODY
DECLARE
l_current_day DOUBLE PRECISION;
l_current_date TIMESTAMP(0) WITHOUT TIME ZONE;
o_time CHARACTER VARYING(300);
l_mode CHARACTER VARYING(1);
BEGIN
l_current_date := ((now() at time zone 'UTC') at time zone '+03:00');
l_current_day := EXTRACT('DOW' from ((now() at time zone 'UTC') at time zone '+03:00'))+1;

BEGIN
    SELECT
        value
        INTO STRICT l_mode
        FROM jahezdbapp.jz_config
        WHERE key = 'ramadan';
END;

IF l_mode = '1' THEN
    BEGIN
        SELECT
            CASE
                WHEN rsft_beta_end_time = '0 second' THEN (CONCAT_WS('', TO_CHAR(l_current_date + rsft_alpha_start_time::interval, 'hh12:mi AM'), ' - ', TO_CHAR(l_current_date + rsft_alpha_end_time::interval, 'hh12:mi AM')))
                WHEN rsft_alpha_start_time = '0 second' AND rsft_beta_end_time <> '0 second' THEN (CONCAT_WS('', TO_CHAR(l_current_date + rsft_beta_start_time::interval, 'hh12:mi AM'), ' - ', TO_CHAR(l_current_date + rsft_beta_end_time::interval, 'hh12:mi AM')))
                ELSE (CONCAT_WS('', TO_CHAR(l_current_date + rsft_alpha_start_time::interval, 'hh12:mi AM'), ' - ', TO_CHAR(l_current_date + rsft_alpha_end_time::interval, 'hh12:mi AM'), ' & ', TO_CHAR(l_current_date + rsft_beta_start_time::interval, 'hh12:mi AM'), ' - ', TO_CHAR(l_current_date + rsft_beta_end_time::interval, 'hh12:mi AM')))
            END
            INTO STRICT o_time
            FROM jahezdbapp.branch_timings
            WHERE branch_id = i_branch_id AND day_of_week = l_current_day;
			

        IF o_time IS NULL THEN
            RAISE no_data_found USING hint = 'NO_DATA_FOUND';
        END IF;
        EXCEPTION
            WHEN no_data_found THEN
                o_time := 'Coming Soon';
                RETURN o_time;
    END;
ELSE
    BEGIN
        EXECUTE 'SELECT 
            CASE
                WHEN sft_beta_end_time = ''0 second'' THEN (CONCAT_WS('''', TO_CHAR(l_current_date + sft_alpha_start_time::interval, ''hh12:mi AM''), '' - '', TO_CHAR(l_current_date + sft_alpha_end_time::interval, ''hh12:mi AM'')))
                WHEN sft_alpha_start_time = ''0 second'' AND sft_beta_end_time <> ''0 second'' THEN (CONCAT_WS('''', TO_CHAR(l_current_date + sft_beta_start_time::interval,''hh12:mi AM''), '' - '', TO_CHAR(l_current_date + sft_beta_end_time::interval, ''hh12:mi AM'')))
                ELSE (CONCAT_WS('''', TO_CHAR(l_current_date + sft_alpha_start_time::interval, ''hh12:mi AM''), '' - '', TO_CHAR(l_current_date + sft_alpha_end_time::interval, ''hh12:mi AM''), '' & '', TO_CHAR(l_current_date + sft_beta_start_time::interval, ''hh12:mi AM''), ' - ', TO_CHAR(l_current_date + sft_beta_end_time::interval, ''hh12:mi AM'')))
            END
            FROM jahezdbapp.branch_timings
            WHERE branch_id = $1 AND day_of_week = $2'
			INTO o_time USING i_branch_id, l_current_day;

        IF o_time IS NULL THEN
            RAISE no_data_found USING hint = 'NO_DATA_FOUND';
        END IF;
        EXCEPTION
            WHEN no_data_found THEN
                o_time := 'Coming Soon';
                RETURN o_time;
    END;
END IF;
RETURN o_time;
EXCEPTION
    WHEN no_data_found THEN
        o_time := 'Coming Soon';
        RETURN o_time;
    WHEN others THEN
        raise notice '% %', SQLERRM, SQLSTATE;
		RETURN NULL;

END;
BODY;

ALTER FUNCTION jahezdbapp."mobile_pkg$get_branch_timing_fn"(double precision)
OWNER TO jahezdbapp;

I suggest you ask in the proper forum for this query, because this query is not related to the grafana, and the other thing, I'm not DBA that have little knowledge of the SQL query....