Whole data from db not updated in elastic index

input {
  jdbc {
    tags => "staff"
    jdbc_connection_string => "jdbc:postgresql://${DB_HOST}:${DB_PORT}/${DB_NAME}"
    jdbc_user => "${DB_USER}"
    jdbc_password => "${DB_PASSWORD}"
    schedule => "* * * * *"
    jdbc_driver_class => "org.postgresql.Driver"
    statement => "WITH sns AS
      (SELECT sb_id,
          json_agg(json_build_object('sns_type', sns_type, 'approval_status', approval_status, 'sns_handle', sns_handle))::varchar AS sns_approval_status,
          max(last_updated) AS last_updated
      FROM staff_sns
      GROUP BY sb_id),
      interest AS
      (WITH cte AS
      (SELECT i.sb_id,
             m.label_jp AS interest,
             json_agg(i.tag_name) AS tags_staff
      FROM staff_interest i
      LEFT JOIN master_interests m ON i.interest_id = m.code
      GROUP BY i.sb_id,
               m.label_jp) SELECT sb_id,
                                  json_agg(json_build_object('interest', interest, 'tags_staff', tags_staff))::varchar AS interests
      FROM cte
      GROUP BY cte.sb_id)
      SELECT profile.sb_id,
       profile.user_code,
       profile.store_account_id,
       profile.account_type,
       profile.user_name,
       profile.user_name_kana,
       profile.user_nickname,
       (SELECT label_jp AS user_gender FROM master_gender WHERE code = profile.user_gender LIMIT 1),
       profile.can_display_gender,
       profile.bio,
       profile.label_name_kana,
       profile.label_code,
       profile.label_name,
       profile.shop_code,
       profile.shop_name,
       profile.shop_name_kana,
       profile.front_display_label_code,
       profile.front_display_label_name,
       profile.can_display_front_label,
       profile.can_display_store,
       profile.store_prefecture_code,
       profile.sb_role_id,
       profile.sb_permission_id,
       profile.branch_cd,
       profile.branch_name,
       profile.dob,
       (SELECT label_jp AS user_height FROM master_height WHERE code = profile.user_height LIMIT 1),
       profile.can_display_user_height,
       profile.can_display_age,
       (SELECT label_jp AS user_weight FROM master_weight WHERE code = profile.user_weight LIMIT 1),
       profile.can_display_user_weight,
       (SELECT label_jp AS user_shoe_size FROM master_shoe_size WHERE code = profile.user_shoe_size LIMIT 1),
       profile.can_display_shoe_size,
       (SELECT label_jp AS user_top_size FROM master_top_size WHERE code = profile.user_top_size LIMIT 1),
       (SELECT label_jp AS user_bottom_size FROM master_bottom_size WHERE code = profile.user_bottom_size LIMIT 1),
       profile.can_display_user_bottom_size,
       profile.can_display_size,
       (SELECT label_jp AS user_body_shape FROM master_body_shape WHERE code = profile.user_body_shape LIMIT 1),
       profile.can_display_body_shape,
       (SELECT label_jp AS body_characteristics FROM master_body_characteristics WHERE code = profile.body_characteristics LIMIT 1),
       profile.can_display_body_characteristics,
       (SELECT label_jp AS shoulder_stature FROM master_shoulder_stature WHERE code = profile.shoulder_stature LIMIT 1),
       profile.can_display_shoulder_stature,
       (SELECT label_jp AS personal_color FROM master_personal_colour WHERE code = profile.personal_color LIMIT 1),
      (SELECT label_jp AS fav_styling_1
      FROM master_favorite_styling
      WHERE code = profile.fav_styling_1
      LIMIT 1),
      (SELECT label_jp AS fav_styling_2
      FROM master_favorite_styling
      WHERE code = profile.fav_styling_2
      LIMIT 1),
      (SELECT label_jp AS fav_styling_3
      FROM master_favorite_styling
      WHERE code = profile.fav_styling_3
      LIMIT 1), profile.have_child,
             profile.can_display_have_child,
             profile.status,
             profile.created_at,
             profile.last_updated,
             profile.last_updated_bff,
             profile.last_updated_stm,
             profile.age_range,
             profile.weight_range,
             profile.coubic_url,
             profile.can_display_profile_on_st,
             profile.can_display_user_name,
             profile.can_display_bio,
             profile.can_display_date_of_birth,
             profile.can_display_user_top_size,
             profile.can_display_personal_color,
             profile.can_display_fav_styling,
             profile.can_display_interest,
             profile.crowned_staff,
             profile.follower_count,
             profile.store_visit,
             profile.styling_count,
             profile.csvideo_count,
             profile.cumulation_count,
             sbp.approval_required AS is_post_approval_required,
             sr.role_name,
             spi.image_url_lg AS user_image_url,
             sns.sns_approval_status,
             br.shop_no AS shop_number,
             interest.interests AS interests
      FROM staff_profile profile
      LEFT JOIN brand br ON profile.label_code = br.label_code
      LEFT JOIN sb_permission sbp ON profile.sb_permission_id = sbp.id
      LEFT JOIN sb_role sr ON profile.sb_role_id = sr.id
      LEFT JOIN staff_profile_image spi ON profile.sb_id = spi.sb_id
      LEFT JOIN sns ON sns.sb_id = profile.sb_id
      LEFT JOIN interest ON interest.sb_id = profile.sb_id
	  WHERE profile.last_updated > :sql_last_value::TIMESTAMP AND profile.last_updated < NOW()
    ORDER BY profile.last_updated ASC,
    profile.sb_id ASC"
    use_column_value => true
    tracking_column => "last_updated"
    tracking_column_type => "timestamp"
    last_run_metadata_path => "/usr/share/logstash/logstash_jdbc_last_run_staff_profile"
    jdbc_paging_enabled => true
    jdbc_page_size => 10000
    jdbc_fetch_size => 10000
  }
  jdbc {
    tags => "styling"
    jdbc_connection_string => "jdbc:postgresql://${DB_HOST}:${DB_PORT}/${DB_NAME}?useCursorFetch=true"
    jdbc_user => "${DB_USER}"
    jdbc_password => "${DB_PASSWORD}"
    schedule => "* * * * *"
    jdbc_driver_class => "org.postgresql.Driver"
    statement => "
      with stylings as (
      SELECT
      styling.styling_id,
      'styling' AS type,
       styling.sb_id,
       styling.user_code,
       styling.shop_code,
       styling.style_height,
       styling.styling_category,
       styling.scheduled_at,
       styling.created_at,
       styling.published_at,
       styling.last_updated,
       styling.status,
       styling.is_deleted,
       styling.created_by,
       styling.favorite_count,
       styling.can_display_styling_on_st,
       profile.label_code,
       profile.label_name,
       profile.shop_name,
       profile.user_nickname,
       profile.branch_cd,
       profile.branch_name,
       br.shop_no AS shop_number
      FROM styling_master styling
      LEFT JOIN brand br ON br.label_code = styling.label_code
      LEFT JOIN staff_profile profile ON styling.sb_id = profile.sb_id
      WHERE
      styling.last_updated > :sql_last_value::TIMESTAMP AND styling.last_updated < NOW()
      AND styling.status <> 'draft'
      order by styling.last_updated asc
      ),
      tags as (
      SELECT styling_id,
          string_agg(DISTINCT tag_name, ' >< ') AS tags_styling,
          count(*) AS tag_count
      FROM styling_tag
      where styling_id in (select styling_id from stylings)
      GROUP BY styling_id
      ),
      medias as (
      SELECT styling_id,
          media_type,
          media_org,
          media_thumb,
          media_sm,
          media_md,
          media_lg
      FROM styling_media
      WHERE seq = 1 and styling_id in (select styling_id from stylings)
      ),
      products as (
      SELECT styling_id,
          string_agg(item_cd::varchar, ' >< ') AS item_codes,
          count(*) AS product_count
      FROM styling_product
      where styling_id in (select styling_id from stylings) and is_deleted=false
      GROUP BY styling_id
      )
      select
      stylings.styling_id,
      stylings.type,
      stylings.sb_id,
      stylings.user_code,
      stylings.shop_code,
      stylings.style_height,
      stylings.styling_category,
      stylings.scheduled_at,
      stylings.created_at,
      stylings.published_at,
      stylings.last_updated,
      stylings.status,
      stylings.is_deleted,
      stylings.created_by,
      stylings.favorite_count,
      stylings.can_display_styling_on_st,
      stylings.label_code,
      stylings.label_name,
      stylings.shop_name,
      stylings.user_nickname,
      stylings.shop_number,
      stylings.branch_cd,
      stylings.branch_name,
      tags.tags_styling,
      tags.tag_count,
      medias.media_type,
      medias.media_org,
      medias.media_thumb,
	    medias.media_sm,
	    medias.media_md,
	    medias.media_lg,
      products.item_codes,
      products.product_count
      from
      stylings
      left join tags
      on stylings.styling_id = tags.styling_id
      left join medias on stylings.styling_id = medias.styling_id
      left join products on stylings.styling_id = products.styling_id
    order by stylings.last_updated ASC"
    use_column_value => true
    tracking_column => "last_updated"
    tracking_column_type => "timestamp"
    last_run_metadata_path => "/usr/share/logstash/logstash_jdbc_last_run_styling"
    jdbc_paging_enabled => true
    jdbc_page_size => 50000
    jdbc_fetch_size => 50000
  }
  jdbc {
    tags => "cumulation"
    jdbc_connection_string => "jdbc:postgresql://${DB_HOST}:${DB_PORT}/${DB_NAME}?useCursorFetch=true"
    jdbc_user => "${DB_USER}"
    jdbc_password => "${DB_PASSWORD}"
    schedule => "* * * * *"
    jdbc_driver_class => "org.postgresql.Driver"
    statement => "WITH
      tag as (
        select cumulation_id,
        string_agg(distinct tag_name, ' >< ' ) as tags_cumulation,
      count(*) as tag_count
        from cumulation_tag
        group by cumulation_id ),
      style as ( with
      product as (
      select styling_id,
        string_agg(distinct item_cd::varchar, ' >< ') as item_codes,
      count(*) as product_count
        from styling_product
        where is_deleted = false
        group by styling_id
      )
      select
      cumulation_style.cumulation_id,
      COALESCE(
        json_agg(DISTINCT
        jsonb_build_object('id', styling_media.id,
                           'styling_id',
                           styling_media.styling_id,
                           'seq', styling_media.seq,
                           'media_type',
                           styling_media.media_type,
                           'media_org', styling_media.media_org,
                           'media_lg', styling_media.media_lg,
                           'media_md', styling_media.media_md,
                           'media_sm', styling_media.media_sm,
                           'media_thumb',
                           styling_media.media_thumb,
                          'created_at',
                           to_char(styling_media.created_at at time zone 'JST', 'YYYY/MM/DD HH24:MI:SS'),
                           'last_updated',
                           to_char(styling_media.last_updated at time zone 'JST', 'YYYY/MM/DD HH24:MI:SS')
            ))
        FILTER (WHERE styling_media.styling_id IS NOT NULL AND styling_master.status = 'published' AND styling_media.seq =1),
        null) as media,
		COALESCE ( 
            json_agg(DISTINCT
                jsonb_build_object('id', cumulation_style.id,
                                   'cumulation_id', cumulation_style.cumulation_id,
                                   'styling_id', styling_master.styling_id,
                                   'user_code', styling_master.user_code,
                                   'label_code', styling_master.label_code,
                                   'shop_code', styling_master.shop_code,
                                   'style_height', styling_master.style_height,
                                   'styling_category', styling_master.styling_category,
                                   'status', styling_master.status,
                                   'is_deleted', styling_master.is_deleted,
                                   'media_org', styling_media.media_org,
                                   'media_thumb', styling_media.media_thumb,
                                   'media_sm', styling_media.media_sm,
                                   'media_md', styling_media.media_md,
                                   'media_lg', styling_media.media_lg,
                                   'scheduled_at', to_char(styling_master.scheduled_at, 'YYYY/MM/DD HH24:MI:SS'),
                                   'created_at', to_char(styling_master.created_at at time zone 'JST', 'YYYY/MM/DD HH24:MI:SS'),
                                   'last_updated',to_char(styling_master.last_updated at time zone 'JST', 'YYYY/MM/DD HH24:MI:SS'),
                                   'published_at',to_char(styling_master.published_at at time zone 'JST', 'YYYY/MM/DD HH24:MI:SS')
                    ))
                FILTER (WHERE styling_master.status = 'published'),
                null)  as stylings,
      array_agg(product.item_codes) as item_codes
      from cumulation_style
      left join styling_master on styling_master.styling_id = cumulation_style.styling_id
      left join styling_media on styling_media.styling_id = cumulation_style.styling_id AND styling_media.seq=1
      left join product on product.styling_id = cumulation_style.styling_id
      group by cumulation_id
      )
      select cumulation.cumulation_id,
      'cumulation' as type,
      cumulation.sb_id,
      cumulation.title,
      cumulation.user_code,
      cumulation.shop_code,
      cumulation.category,
      cumulation.scheduled_at,
      cumulation.created_at,
      cumulation.created_by,
      cumulation.last_updated,
      cumulation.status,
      cumulation.can_display_cumulation_on_st,
      cumulation.published_at,
      cumulation.is_deleted,
      cumulation.is_scheduled,
      cumulation.favorite_count,
      cumulation.comment_count,
      cumulation.view_count,
      profile.user_name,
      profile.shop_name,
      profile.label_code,
      profile.label_name,
      profile.user_nickname,
      profile.branch_cd,
      profile.branch_name,
      br.shop_no as shop_number,
      tag.tags_cumulation,
      tag.tag_count,
      style.media :: text,
	    style.stylings :: text,
      array_to_string(style.item_codes, ' >< ') as item_codes
      from cumulation_master cumulation
      left join staff_profile profile on cumulation.sb_id = profile.sb_id
      left join brand br on br.label_code = cumulation.label_code
      left join tag on tag.cumulation_id = cumulation.cumulation_id
      left join style on style.cumulation_id = cumulation.cumulation_id
      where
      cumulation.last_updated > :sql_last_value::timestamp AND cumulation.last_updated < NOW()
      and cumulation.status <> 'draft'
    order by cumulation.last_updated ASC"
    use_column_value => true
    tracking_column => "last_updated"
    tracking_column_type => "timestamp"
    last_run_metadata_path => "/usr/share/logstash/logstash_jdbc_last_run_cumulation"
    jdbc_paging_enabled => true
    jdbc_page_size => 50000
    jdbc_fetch_size => 50000
  }
  jdbc {
    tags => "csvideo"
    jdbc_connection_string => "jdbc:postgresql://${DB_HOST}:${DB_PORT}/${DB_NAME}?useCursorFetch=true"
    jdbc_user => "${DB_USER}"
    jdbc_password => "${DB_PASSWORD}"
    schedule => "* * * * *"
    jdbc_driver_class => "org.postgresql.Driver"
    statement => "
      with csvideos as (
      SELECT
      csvideo.csvideo_id,
      'CS Video' AS type,
       csvideo.sb_id,
       csvideo.user_code,
       csvideo.shop_code,
       csvideo.style_height,
       csvideo.styling_category,
       csvideo.scheduled_at,
       csvideo.created_at,
       csvideo.published_at,
       csvideo.last_updated,
       csvideo.status,
       csvideo.is_deleted,
       csvideo.created_by,
       csvideo.favorite_count,
       csvideo.can_display_csvideo_on_st,
       profile.label_code,
       profile.label_name,
       profile.shop_name,
       profile.user_nickname,
       profile.branch_cd,
       profile.branch_name,
       br.shop_no AS shop_number
      FROM csvideo_master csvideo
      LEFT JOIN brand br ON br.label_code = csvideo.label_code
      LEFT JOIN staff_profile profile ON csvideo.sb_id = profile.sb_id
      WHERE
      csvideo.last_updated > :sql_last_value::TIMESTAMP AND csvideo.last_updated < NOW()
      AND csvideo.status <> 'draft'
      ),
      tags as (
      SELECT csvideo_id,
          string_agg(DISTINCT tag_name, ' >< ') AS tags_csvideo,
          count(*) AS tag_count
      FROM csvideo_tag
      where csvideo_id in (select csvideo_id from csvideos)
      GROUP BY csvideo_id
      ),
      medias as (
      SELECT csvideo_id,
          media_org, media_thumb
      FROM csvideo_media
      WHERE seq = 1 and csvideo_id in (select csvideo_id from csvideos)
      GROUP BY csvideo_id,
            media_org, media_thumb
      ),
      products as (
      SELECT csvideo_id,
          string_agg(item_cd::varchar, ' >< ') AS item_codes,
          count(*) AS product_count
      FROM csvideo_product
      where csvideo_id in (select csvideo_id from csvideos)
      GROUP BY csvideo_id
      )
      select
      csvideos.csvideo_id,
      csvideos.type,
      csvideos.sb_id,
      csvideos.user_code,
      csvideos.shop_code,
      csvideos.style_height,
      csvideos.styling_category,
      csvideos.scheduled_at,
      csvideos.created_at,
      csvideos.published_at,
      csvideos.last_updated,
      csvideos.status,
      csvideos.is_deleted,
      csvideos.created_by,
      csvideos.favorite_count,
      csvideos.can_display_csvideo_on_st,
      csvideos.label_code,
      csvideos.label_name,
      csvideos.shop_name,
      csvideos.user_nickname,
      csvideos.shop_number,
      csvideos.branch_cd,
      csvideos.branch_name,
      tags.tags_csvideo,
      tags.tag_count,
      medias.media_org,
      medias.media_thumb,
      products.item_codes,
      products.product_count
      from
      csvideos
      left join tags
      on csvideos.csvideo_id = tags.csvideo_id
      left join medias on csvideos.csvideo_id = medias.csvideo_id
      left join products on csvideos.csvideo_id = products.csvideo_id
    order by csvideos.last_updated ASC"
    use_column_value => true
    tracking_column => "last_updated"
    tracking_column_type => "timestamp"
    last_run_metadata_path => "/usr/share/logstash/logstash_jdbc_last_run_csvideo"
    jdbc_paging_enabled => true
    jdbc_page_size => 50000
    jdbc_fetch_size => 50000
  }

  jdbc {
    tags => "coordinate_list"
    jdbc_connection_string => "jdbc:postgresql://${DB_HOST}:${DB_PORT}/${DB_NAME}"
    jdbc_user => "${DB_USER}"
    jdbc_password => "${DB_PASSWORD}"
    schedule => "* * * * *"
    jdbc_driver_class => "org.postgresql.Driver"
    statement => "SELECT *
        from (
                select ds.styling_id::text                          as cid,
                        coordinate_main_image_url::text              as image_url,
                        ds.created_at::text,
                        ds.published_at::text                        as accept_at,
                        accept::text,
                        ds.style_height_value::text                  as coordinate_height,
                        ds.coordinate_genre::text                    as coordinate_genre,
                        team_post::text,
                        ds.sb_id::text                               as user_id,
                        dstaff.user_code::text,
                        user_name::text,
                        user_name_kana::text,
                        image_url_src::text                          as user_image_url,
                        case
                            when dstaff.can_display_gender is true then dstaff.user_gender
                            else '0'::text end                       as user_gender,
                        case
                            when dstaff.can_display_user_height is true then dstaff.user_height_value
                            else '0'::text end                       as user_height,
                        case
                            when dstaff.can_display_store is true then dstaff.shop_id
                            else ''::text end                        as shop_id,
                        case
                            when dstaff.can_display_store is true then dstaff.shop_name
                            else ''::text end                        as shop_name,
                        case
                            when dstaff.can_display_store is true then dstaff.shop_code
                            else ''::text end                        as shop_code,
                        ds.view_count::text                          as pv,
                        case
                            when dstaff.can_display_date_of_birth is true
                                then coalesce(date_part('year', age(dstaff.dob))::text, '0'::text)
                            else '0'::text end                       as user_age,
                        dstaff.front_display_label_code              as label_code,
                        dstaff.front_display_label_name              as label_name,
                        tags,
                        crowned_staff,                       
                        (
                            select json_agg(
                                        json_build_object(
                                                'cid', dpd.item_cd::text,
                                                'product_code', RIGHT(dpd.goods_stk_no::text, 10),
                                                'jan_code', null::text,
                                                'name', dpd.item_nm::text,
                                                'base_product_code', RIGHT(dpd.item_cd::text, 6),
                                                'label', dpd.shop_nm::text,
                                                'label_code', dpd.label_code::text,
                                                'category', dic.category_name::text,
                                                'category_code', dic.category_code::text,
                                                'color', dpd.color_nm::text,
                                                'color_code', dpd.color_cd::text,
                                                'size', dpd.size_nm::text,
                                                'size_code', dpd.size_cd::text,
                                                'image_url', dpd.color_big_img::text,
                                                'genre_code', dic.genre_code::text,
                                                'size', dpd.size_nm,
                                                'size_code', dpd.size_cd,
                                                'search_size_cd', dpd.search_size_cd,
                                                'color', dpd.color_nm,
                                                'color_code', dpd.color_cd,
                                                'search_color_cd', dpd.search_color_cd,
                                                'product_url', case
                                                                    when dpd.shop_url is not null and
                                                                        dpd.item_cd is not null and
                                                                        dpd.color_cd is not null and
                                                                        dpd.brand_nm_short is not null
                                                                        and dpd.goods_stk_no is not null
                                                                        then concat(dpd.shop_url,
                                                                                    'disp/CSfGoodsPage_001.jsp?ITEM_CD=',
                                                                                    dpd.item_cd,
                                                                                    '&COLOR_CD=', dpd.color_cd,
                                                                                    '&site=staff-board&sb_br=',
                                                                                    dpd.brand_nm_short,
                                                                                    '&sb_sku=', dpd.goods_stk_no)
                                                    end,
                                                'is_public', dpd.is_item_active::text
                                            )
                                    )
                            from denormalized_product_details dpd
                                    INNER JOIN denormalized_item_category dic on dpd.item_cd = dic.item_cd
                            where dpd.goods_stk_no = any (ds.product_ids)
                        )::text                                      as products,
                        json_build_array((resized_contents ->> 0)::json)::text AS resized_contents,
                        include_video,
                        dstaff.user_resized_images::text,
                        ds.last_updated,
                        -- Fields to be added
                        ds.comment,
                        -- Columns to Sort
                        ds.view_count,
                        ds.published_at,
                        ds.sales_amount,
                        -- Conditions but not send in response
                        dstaff.shop_code,
                        dstaff.sb_id,
                        dstaff.user_code,
                        dstaff.is_team
                FROM denormalized_style ds
                        INNER JOIN denormalized_staff dstaff
                                    ON dstaff.sb_id = ds.sb_id AND dstaff.status = 'active'
                WHERE ds.status = 'published'
                AND ds.is_deleted = FALSE
                AND ds.product_ids IS NOT NULL
                AND can_display_styling_on_st = 'On'
                AND ds.last_updated > :sql_last_value::TIMESTAMP
                AND ds.last_updated < NOW()
            ) as data
        WHERE data.products is not null
        order by data.last_updated ASC"
    use_column_value => true
    tracking_column => "last_updated"
    tracking_column_type => "timestamp"
    last_run_metadata_path => "/usr/share/logstash/logstash_jdbc_last_run_coordinate_list"
    jdbc_paging_enabled => true
    jdbc_page_size => 5000
    jdbc_fetch_size => 5000
  }
}

filter {
  if "cumulation" in [tags]{
    json {
    source => "media"
    target => "media"
    }
    json {
    source => "stylings"
    target => "stylings"
    }
  }
  if "coordinate_list" in [tags]{
    json{
        source => "products"
        target => "products"
    }
    json{
        source => "resized_contents"
        target => "resized_contents"
    }
    json{
        source => "user_resized_images"
        target => "user_resized_images"
    }
  }
}

output {
  if "staff" in [tags] {
    elasticsearch {
      hosts => ["${ELASTIC_HOST}"]
      ssl => true
      index => "${ELASTIC_STAFF_INDEX}"
      document_id => "%{sb_id}"
      user => "${ELASTIC_USER_NAME}"
      password => "${ELASTIC_PASSWORD}"
      ilm_enabled => false
    }
  }
  if "styling" in [tags] {
    elasticsearch {
      hosts => ["${ELASTIC_HOST}"]
      ssl => true
      index => "${ELASTIC_STYLING_INDEX}"
      document_id => "%{styling_id}"
      user => "${ELASTIC_USER_NAME}"
      password => "${ELASTIC_PASSWORD}"
      ilm_enabled => false
    }
  }
  if "cumulation" in [tags] {
    elasticsearch {
      hosts => ["${ELASTIC_HOST}"]
      ssl => true
      index => "${ELASTIC_CUMULATION_INDEX}"
      document_id => "%{cumulation_id}"
      user => "${ELASTIC_USER_NAME}"
      password => "${ELASTIC_PASSWORD}"
      ilm_enabled => false
    }
  }
  if "csvideo" in [tags] {
    elasticsearch {
      hosts => ["${ELASTIC_HOST}"]
      ssl => true
      index => "${ELASTIC_CSVIDEO_INDEX}"
      document_id => "%{csvideo_id}"
      user => "${ELASTIC_USER_NAME}"
      password => "${ELASTIC_PASSWORD}"
      ilm_enabled => false
    }
  }
  if "coordinate_list" in [tags] {
    elasticsearch {
      hosts => ["${ELASTIC_HOST}"]
      ssl => true
      index => "${ELASTIC_COORDINATE_LIST_INDEX}"
      document_id => "%{cid}"
      user => "${ELASTIC_USER_NAME}"
      password => "${ELASTIC_PASSWORD}"
      ilm_enabled => false
    }
  }
}

I have a configuration file as shown above. The database query for the tags set as styling(tags => "styling") is not inserting all the data into the index. I have manually executed the query in a db client and the result count is correct as we expected. However on running the query in logstash whole data is not inserted/missed. I have no idea why.
There is above 900000 rows of data for that query. Close to 20000 rows are not updated in elastic index.

What should be the ideal cpu and memory spec for logstash and Elasticsearch? As of now we are using 2GB CPU 4GB memory for logstash and 4GB CPU and 16 GB RAM for Elasticsearch. Could this be causing issue?

I checked connecting to the RDS db from the local logstash and inserting into a local elastic index, it seems to load all the data correctly
Any help is appreciated

The most likely reason I can think of is that document_id => "%{styling_id}" is causing documents to be overwritten because styling_id is not unique. Try creating a new index and remove that option so that every event creates one document.

Another option is to change

if "styling" in [tags] {
    file { path => "/tmp/count.txt" codec => line { format => "#" } }
    elasticsearch {

and when it is finished run wc -l /tmp/count.txt.


Styling id is unique primary key. @Badger

I am certain you believe that to be true, I am suggesting you do an experiment to confirm it, and suggested two ways to do that.

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.