input {
jdbc {
tags => "staff"
jdbc_connection_string => "jdbc:postgresql://${DB_HOST}:${DB_PORT}/${DB_NAME}"
jdbc_user => "${DB_USER}"
jdbc_password => "${DB_PASSWORD}"
schedule => "* * * * *"
jdbc_driver_class => "org.postgresql.Driver"
statement => "WITH sns AS
(SELECT sb_id,
json_agg(json_build_object('sns_type', sns_type, 'approval_status', approval_status, 'sns_handle', sns_handle))::varchar AS sns_approval_status,
max(last_updated) AS last_updated
FROM staff_sns
GROUP BY sb_id),
interest AS
(WITH cte AS
(SELECT i.sb_id,
m.label_jp AS interest,
json_agg(i.tag_name) AS tags_staff
FROM staff_interest i
LEFT JOIN master_interests m ON i.interest_id = m.code
GROUP BY i.sb_id,
m.label_jp) SELECT sb_id,
json_agg(json_build_object('interest', interest, 'tags_staff', tags_staff))::varchar AS interests
FROM cte
GROUP BY cte.sb_id)
SELECT profile.sb_id,
profile.user_code,
profile.store_account_id,
profile.account_type,
profile.user_name,
profile.user_name_kana,
profile.user_nickname,
(SELECT label_jp AS user_gender FROM master_gender WHERE code = profile.user_gender LIMIT 1),
profile.can_display_gender,
profile.bio,
profile.label_name_kana,
profile.label_code,
profile.label_name,
profile.shop_code,
profile.shop_name,
profile.shop_name_kana,
profile.front_display_label_code,
profile.front_display_label_name,
profile.can_display_front_label,
profile.can_display_store,
profile.store_prefecture_code,
profile.sb_role_id,
profile.sb_permission_id,
profile.branch_cd,
profile.branch_name,
profile.dob,
(SELECT label_jp AS user_height FROM master_height WHERE code = profile.user_height LIMIT 1),
profile.can_display_user_height,
profile.can_display_age,
(SELECT label_jp AS user_weight FROM master_weight WHERE code = profile.user_weight LIMIT 1),
profile.can_display_user_weight,
(SELECT label_jp AS user_shoe_size FROM master_shoe_size WHERE code = profile.user_shoe_size LIMIT 1),
profile.can_display_shoe_size,
(SELECT label_jp AS user_top_size FROM master_top_size WHERE code = profile.user_top_size LIMIT 1),
(SELECT label_jp AS user_bottom_size FROM master_bottom_size WHERE code = profile.user_bottom_size LIMIT 1),
profile.can_display_user_bottom_size,
profile.can_display_size,
(SELECT label_jp AS user_body_shape FROM master_body_shape WHERE code = profile.user_body_shape LIMIT 1),
profile.can_display_body_shape,
(SELECT label_jp AS body_characteristics FROM master_body_characteristics WHERE code = profile.body_characteristics LIMIT 1),
profile.can_display_body_characteristics,
(SELECT label_jp AS shoulder_stature FROM master_shoulder_stature WHERE code = profile.shoulder_stature LIMIT 1),
profile.can_display_shoulder_stature,
(SELECT label_jp AS personal_color FROM master_personal_colour WHERE code = profile.personal_color LIMIT 1),
(SELECT label_jp AS fav_styling_1
FROM master_favorite_styling
WHERE code = profile.fav_styling_1
LIMIT 1),
(SELECT label_jp AS fav_styling_2
FROM master_favorite_styling
WHERE code = profile.fav_styling_2
LIMIT 1),
(SELECT label_jp AS fav_styling_3
FROM master_favorite_styling
WHERE code = profile.fav_styling_3
LIMIT 1), profile.have_child,
profile.can_display_have_child,
profile.status,
profile.created_at,
profile.last_updated,
profile.last_updated_bff,
profile.last_updated_stm,
profile.age_range,
profile.weight_range,
profile.coubic_url,
profile.can_display_profile_on_st,
profile.can_display_user_name,
profile.can_display_bio,
profile.can_display_date_of_birth,
profile.can_display_user_top_size,
profile.can_display_personal_color,
profile.can_display_fav_styling,
profile.can_display_interest,
profile.crowned_staff,
profile.follower_count,
profile.store_visit,
profile.styling_count,
profile.csvideo_count,
profile.cumulation_count,
sbp.approval_required AS is_post_approval_required,
sr.role_name,
spi.image_url_lg AS user_image_url,
sns.sns_approval_status,
br.shop_no AS shop_number,
interest.interests AS interests
FROM staff_profile profile
LEFT JOIN brand br ON profile.label_code = br.label_code
LEFT JOIN sb_permission sbp ON profile.sb_permission_id = sbp.id
LEFT JOIN sb_role sr ON profile.sb_role_id = sr.id
LEFT JOIN staff_profile_image spi ON profile.sb_id = spi.sb_id
LEFT JOIN sns ON sns.sb_id = profile.sb_id
LEFT JOIN interest ON interest.sb_id = profile.sb_id
WHERE profile.last_updated > :sql_last_value::TIMESTAMP AND profile.last_updated < NOW()
ORDER BY profile.last_updated ASC,
profile.sb_id ASC"
use_column_value => true
tracking_column => "last_updated"
tracking_column_type => "timestamp"
last_run_metadata_path => "/usr/share/logstash/logstash_jdbc_last_run_staff_profile"
jdbc_paging_enabled => true
jdbc_page_size => 10000
jdbc_fetch_size => 10000
}
jdbc {
tags => "styling"
jdbc_connection_string => "jdbc:postgresql://${DB_HOST}:${DB_PORT}/${DB_NAME}?useCursorFetch=true"
jdbc_user => "${DB_USER}"
jdbc_password => "${DB_PASSWORD}"
schedule => "* * * * *"
jdbc_driver_class => "org.postgresql.Driver"
statement => "
with stylings as (
SELECT
styling.styling_id,
'styling' AS type,
styling.sb_id,
styling.user_code,
styling.shop_code,
styling.style_height,
styling.styling_category,
styling.scheduled_at,
styling.created_at,
styling.published_at,
styling.last_updated,
styling.status,
styling.is_deleted,
styling.created_by,
styling.favorite_count,
styling.can_display_styling_on_st,
profile.label_code,
profile.label_name,
profile.shop_name,
profile.user_nickname,
profile.branch_cd,
profile.branch_name,
br.shop_no AS shop_number
FROM styling_master styling
LEFT JOIN brand br ON br.label_code = styling.label_code
LEFT JOIN staff_profile profile ON styling.sb_id = profile.sb_id
WHERE
styling.last_updated > :sql_last_value::TIMESTAMP AND styling.last_updated < NOW()
AND styling.status <> 'draft'
order by styling.last_updated asc
),
tags as (
SELECT styling_id,
string_agg(DISTINCT tag_name, ' >< ') AS tags_styling,
count(*) AS tag_count
FROM styling_tag
where styling_id in (select styling_id from stylings)
GROUP BY styling_id
),
medias as (
SELECT styling_id,
media_type,
media_org,
media_thumb,
media_sm,
media_md,
media_lg
FROM styling_media
WHERE seq = 1 and styling_id in (select styling_id from stylings)
),
products as (
SELECT styling_id,
string_agg(item_cd::varchar, ' >< ') AS item_codes,
count(*) AS product_count
FROM styling_product
where styling_id in (select styling_id from stylings) and is_deleted=false
GROUP BY styling_id
)
select
stylings.styling_id,
stylings.type,
stylings.sb_id,
stylings.user_code,
stylings.shop_code,
stylings.style_height,
stylings.styling_category,
stylings.scheduled_at,
stylings.created_at,
stylings.published_at,
stylings.last_updated,
stylings.status,
stylings.is_deleted,
stylings.created_by,
stylings.favorite_count,
stylings.can_display_styling_on_st,
stylings.label_code,
stylings.label_name,
stylings.shop_name,
stylings.user_nickname,
stylings.shop_number,
stylings.branch_cd,
stylings.branch_name,
tags.tags_styling,
tags.tag_count,
medias.media_type,
medias.media_org,
medias.media_thumb,
medias.media_sm,
medias.media_md,
medias.media_lg,
products.item_codes,
products.product_count
from
stylings
left join tags
on stylings.styling_id = tags.styling_id
left join medias on stylings.styling_id = medias.styling_id
left join products on stylings.styling_id = products.styling_id
order by stylings.last_updated ASC"
use_column_value => true
tracking_column => "last_updated"
tracking_column_type => "timestamp"
last_run_metadata_path => "/usr/share/logstash/logstash_jdbc_last_run_styling"
jdbc_paging_enabled => true
jdbc_page_size => 50000
jdbc_fetch_size => 50000
}
jdbc {
tags => "cumulation"
jdbc_connection_string => "jdbc:postgresql://${DB_HOST}:${DB_PORT}/${DB_NAME}?useCursorFetch=true"
jdbc_user => "${DB_USER}"
jdbc_password => "${DB_PASSWORD}"
schedule => "* * * * *"
jdbc_driver_class => "org.postgresql.Driver"
statement => "WITH
tag as (
select cumulation_id,
string_agg(distinct tag_name, ' >< ' ) as tags_cumulation,
count(*) as tag_count
from cumulation_tag
group by cumulation_id ),
style as ( with
product as (
select styling_id,
string_agg(distinct item_cd::varchar, ' >< ') as item_codes,
count(*) as product_count
from styling_product
where is_deleted = false
group by styling_id
)
select
cumulation_style.cumulation_id,
COALESCE(
json_agg(DISTINCT
jsonb_build_object('id', styling_media.id,
'styling_id',
styling_media.styling_id,
'seq', styling_media.seq,
'media_type',
styling_media.media_type,
'media_org', styling_media.media_org,
'media_lg', styling_media.media_lg,
'media_md', styling_media.media_md,
'media_sm', styling_media.media_sm,
'media_thumb',
styling_media.media_thumb,
'created_at',
to_char(styling_media.created_at at time zone 'JST', 'YYYY/MM/DD HH24:MI:SS'),
'last_updated',
to_char(styling_media.last_updated at time zone 'JST', 'YYYY/MM/DD HH24:MI:SS')
))
FILTER (WHERE styling_media.styling_id IS NOT NULL AND styling_master.status = 'published' AND styling_media.seq =1),
null) as media,
COALESCE (
json_agg(DISTINCT
jsonb_build_object('id', cumulation_style.id,
'cumulation_id', cumulation_style.cumulation_id,
'styling_id', styling_master.styling_id,
'user_code', styling_master.user_code,
'label_code', styling_master.label_code,
'shop_code', styling_master.shop_code,
'style_height', styling_master.style_height,
'styling_category', styling_master.styling_category,
'status', styling_master.status,
'is_deleted', styling_master.is_deleted,
'media_org', styling_media.media_org,
'media_thumb', styling_media.media_thumb,
'media_sm', styling_media.media_sm,
'media_md', styling_media.media_md,
'media_lg', styling_media.media_lg,
'scheduled_at', to_char(styling_master.scheduled_at, 'YYYY/MM/DD HH24:MI:SS'),
'created_at', to_char(styling_master.created_at at time zone 'JST', 'YYYY/MM/DD HH24:MI:SS'),
'last_updated',to_char(styling_master.last_updated at time zone 'JST', 'YYYY/MM/DD HH24:MI:SS'),
'published_at',to_char(styling_master.published_at at time zone 'JST', 'YYYY/MM/DD HH24:MI:SS')
))
FILTER (WHERE styling_master.status = 'published'),
null) as stylings,
array_agg(product.item_codes) as item_codes
from cumulation_style
left join styling_master on styling_master.styling_id = cumulation_style.styling_id
left join styling_media on styling_media.styling_id = cumulation_style.styling_id AND styling_media.seq=1
left join product on product.styling_id = cumulation_style.styling_id
group by cumulation_id
)
select cumulation.cumulation_id,
'cumulation' as type,
cumulation.sb_id,
cumulation.title,
cumulation.user_code,
cumulation.shop_code,
cumulation.category,
cumulation.scheduled_at,
cumulation.created_at,
cumulation.created_by,
cumulation.last_updated,
cumulation.status,
cumulation.can_display_cumulation_on_st,
cumulation.published_at,
cumulation.is_deleted,
cumulation.is_scheduled,
cumulation.favorite_count,
cumulation.comment_count,
cumulation.view_count,
profile.user_name,
profile.shop_name,
profile.label_code,
profile.label_name,
profile.user_nickname,
profile.branch_cd,
profile.branch_name,
br.shop_no as shop_number,
tag.tags_cumulation,
tag.tag_count,
style.media :: text,
style.stylings :: text,
array_to_string(style.item_codes, ' >< ') as item_codes
from cumulation_master cumulation
left join staff_profile profile on cumulation.sb_id = profile.sb_id
left join brand br on br.label_code = cumulation.label_code
left join tag on tag.cumulation_id = cumulation.cumulation_id
left join style on style.cumulation_id = cumulation.cumulation_id
where
cumulation.last_updated > :sql_last_value::timestamp AND cumulation.last_updated < NOW()
and cumulation.status <> 'draft'
order by cumulation.last_updated ASC"
use_column_value => true
tracking_column => "last_updated"
tracking_column_type => "timestamp"
last_run_metadata_path => "/usr/share/logstash/logstash_jdbc_last_run_cumulation"
jdbc_paging_enabled => true
jdbc_page_size => 50000
jdbc_fetch_size => 50000
}
jdbc {
tags => "csvideo"
jdbc_connection_string => "jdbc:postgresql://${DB_HOST}:${DB_PORT}/${DB_NAME}?useCursorFetch=true"
jdbc_user => "${DB_USER}"
jdbc_password => "${DB_PASSWORD}"
schedule => "* * * * *"
jdbc_driver_class => "org.postgresql.Driver"
statement => "
with csvideos as (
SELECT
csvideo.csvideo_id,
'CS Video' AS type,
csvideo.sb_id,
csvideo.user_code,
csvideo.shop_code,
csvideo.style_height,
csvideo.styling_category,
csvideo.scheduled_at,
csvideo.created_at,
csvideo.published_at,
csvideo.last_updated,
csvideo.status,
csvideo.is_deleted,
csvideo.created_by,
csvideo.favorite_count,
csvideo.can_display_csvideo_on_st,
profile.label_code,
profile.label_name,
profile.shop_name,
profile.user_nickname,
profile.branch_cd,
profile.branch_name,
br.shop_no AS shop_number
FROM csvideo_master csvideo
LEFT JOIN brand br ON br.label_code = csvideo.label_code
LEFT JOIN staff_profile profile ON csvideo.sb_id = profile.sb_id
WHERE
csvideo.last_updated > :sql_last_value::TIMESTAMP AND csvideo.last_updated < NOW()
AND csvideo.status <> 'draft'
),
tags as (
SELECT csvideo_id,
string_agg(DISTINCT tag_name, ' >< ') AS tags_csvideo,
count(*) AS tag_count
FROM csvideo_tag
where csvideo_id in (select csvideo_id from csvideos)
GROUP BY csvideo_id
),
medias as (
SELECT csvideo_id,
media_org, media_thumb
FROM csvideo_media
WHERE seq = 1 and csvideo_id in (select csvideo_id from csvideos)
GROUP BY csvideo_id,
media_org, media_thumb
),
products as (
SELECT csvideo_id,
string_agg(item_cd::varchar, ' >< ') AS item_codes,
count(*) AS product_count
FROM csvideo_product
where csvideo_id in (select csvideo_id from csvideos)
GROUP BY csvideo_id
)
select
csvideos.csvideo_id,
csvideos.type,
csvideos.sb_id,
csvideos.user_code,
csvideos.shop_code,
csvideos.style_height,
csvideos.styling_category,
csvideos.scheduled_at,
csvideos.created_at,
csvideos.published_at,
csvideos.last_updated,
csvideos.status,
csvideos.is_deleted,
csvideos.created_by,
csvideos.favorite_count,
csvideos.can_display_csvideo_on_st,
csvideos.label_code,
csvideos.label_name,
csvideos.shop_name,
csvideos.user_nickname,
csvideos.shop_number,
csvideos.branch_cd,
csvideos.branch_name,
tags.tags_csvideo,
tags.tag_count,
medias.media_org,
medias.media_thumb,
products.item_codes,
products.product_count
from
csvideos
left join tags
on csvideos.csvideo_id = tags.csvideo_id
left join medias on csvideos.csvideo_id = medias.csvideo_id
left join products on csvideos.csvideo_id = products.csvideo_id
order by csvideos.last_updated ASC"
use_column_value => true
tracking_column => "last_updated"
tracking_column_type => "timestamp"
last_run_metadata_path => "/usr/share/logstash/logstash_jdbc_last_run_csvideo"
jdbc_paging_enabled => true
jdbc_page_size => 50000
jdbc_fetch_size => 50000
}
jdbc {
tags => "coordinate_list"
jdbc_connection_string => "jdbc:postgresql://${DB_HOST}:${DB_PORT}/${DB_NAME}"
jdbc_user => "${DB_USER}"
jdbc_password => "${DB_PASSWORD}"
schedule => "* * * * *"
jdbc_driver_class => "org.postgresql.Driver"
statement => "SELECT *
from (
select ds.styling_id::text as cid,
coordinate_main_image_url::text as image_url,
ds.created_at::text,
ds.published_at::text as accept_at,
accept::text,
ds.style_height_value::text as coordinate_height,
ds.coordinate_genre::text as coordinate_genre,
team_post::text,
ds.sb_id::text as user_id,
dstaff.user_code::text,
user_name::text,
user_name_kana::text,
image_url_src::text as user_image_url,
case
when dstaff.can_display_gender is true then dstaff.user_gender
else '0'::text end as user_gender,
case
when dstaff.can_display_user_height is true then dstaff.user_height_value
else '0'::text end as user_height,
case
when dstaff.can_display_store is true then dstaff.shop_id
else ''::text end as shop_id,
case
when dstaff.can_display_store is true then dstaff.shop_name
else ''::text end as shop_name,
case
when dstaff.can_display_store is true then dstaff.shop_code
else ''::text end as shop_code,
ds.view_count::text as pv,
case
when dstaff.can_display_date_of_birth is true
then coalesce(date_part('year', age(dstaff.dob))::text, '0'::text)
else '0'::text end as user_age,
dstaff.front_display_label_code as label_code,
dstaff.front_display_label_name as label_name,
tags,
crowned_staff,
(
select json_agg(
json_build_object(
'cid', dpd.item_cd::text,
'product_code', RIGHT(dpd.goods_stk_no::text, 10),
'jan_code', null::text,
'name', dpd.item_nm::text,
'base_product_code', RIGHT(dpd.item_cd::text, 6),
'label', dpd.shop_nm::text,
'label_code', dpd.label_code::text,
'category', dic.category_name::text,
'category_code', dic.category_code::text,
'color', dpd.color_nm::text,
'color_code', dpd.color_cd::text,
'size', dpd.size_nm::text,
'size_code', dpd.size_cd::text,
'image_url', dpd.color_big_img::text,
'genre_code', dic.genre_code::text,
'size', dpd.size_nm,
'size_code', dpd.size_cd,
'search_size_cd', dpd.search_size_cd,
'color', dpd.color_nm,
'color_code', dpd.color_cd,
'search_color_cd', dpd.search_color_cd,
'product_url', case
when dpd.shop_url is not null and
dpd.item_cd is not null and
dpd.color_cd is not null and
dpd.brand_nm_short is not null
and dpd.goods_stk_no is not null
then concat(dpd.shop_url,
'disp/CSfGoodsPage_001.jsp?ITEM_CD=',
dpd.item_cd,
'&COLOR_CD=', dpd.color_cd,
'&site=staff-board&sb_br=',
dpd.brand_nm_short,
'&sb_sku=', dpd.goods_stk_no)
end,
'is_public', dpd.is_item_active::text
)
)
from denormalized_product_details dpd
INNER JOIN denormalized_item_category dic on dpd.item_cd = dic.item_cd
where dpd.goods_stk_no = any (ds.product_ids)
)::text as products,
json_build_array((resized_contents ->> 0)::json)::text AS resized_contents,
include_video,
dstaff.user_resized_images::text,
ds.last_updated,
-- Fields to be added
ds.comment,
-- Columns to Sort
ds.view_count,
ds.published_at,
ds.sales_amount,
-- Conditions but not send in response
dstaff.shop_code,
dstaff.sb_id,
dstaff.user_code,
dstaff.is_team
FROM denormalized_style ds
INNER JOIN denormalized_staff dstaff
ON dstaff.sb_id = ds.sb_id AND dstaff.status = 'active'
WHERE ds.status = 'published'
AND ds.is_deleted = FALSE
AND ds.product_ids IS NOT NULL
AND can_display_styling_on_st = 'On'
AND ds.last_updated > :sql_last_value::TIMESTAMP
AND ds.last_updated < NOW()
) as data
WHERE data.products is not null
order by data.last_updated ASC"
use_column_value => true
tracking_column => "last_updated"
tracking_column_type => "timestamp"
last_run_metadata_path => "/usr/share/logstash/logstash_jdbc_last_run_coordinate_list"
jdbc_paging_enabled => true
jdbc_page_size => 5000
jdbc_fetch_size => 5000
}
}
filter {
if "cumulation" in [tags]{
json {
source => "media"
target => "media"
}
json {
source => "stylings"
target => "stylings"
}
}
if "coordinate_list" in [tags]{
json{
source => "products"
target => "products"
}
json{
source => "resized_contents"
target => "resized_contents"
}
json{
source => "user_resized_images"
target => "user_resized_images"
}
}
}
output {
if "staff" in [tags] {
elasticsearch {
hosts => ["${ELASTIC_HOST}"]
ssl => true
index => "${ELASTIC_STAFF_INDEX}"
document_id => "%{sb_id}"
user => "${ELASTIC_USER_NAME}"
password => "${ELASTIC_PASSWORD}"
ilm_enabled => false
}
}
if "styling" in [tags] {
elasticsearch {
hosts => ["${ELASTIC_HOST}"]
ssl => true
index => "${ELASTIC_STYLING_INDEX}"
document_id => "%{styling_id}"
user => "${ELASTIC_USER_NAME}"
password => "${ELASTIC_PASSWORD}"
ilm_enabled => false
}
}
if "cumulation" in [tags] {
elasticsearch {
hosts => ["${ELASTIC_HOST}"]
ssl => true
index => "${ELASTIC_CUMULATION_INDEX}"
document_id => "%{cumulation_id}"
user => "${ELASTIC_USER_NAME}"
password => "${ELASTIC_PASSWORD}"
ilm_enabled => false
}
}
if "csvideo" in [tags] {
elasticsearch {
hosts => ["${ELASTIC_HOST}"]
ssl => true
index => "${ELASTIC_CSVIDEO_INDEX}"
document_id => "%{csvideo_id}"
user => "${ELASTIC_USER_NAME}"
password => "${ELASTIC_PASSWORD}"
ilm_enabled => false
}
}
if "coordinate_list" in [tags] {
elasticsearch {
hosts => ["${ELASTIC_HOST}"]
ssl => true
index => "${ELASTIC_COORDINATE_LIST_INDEX}"
document_id => "%{cid}"
user => "${ELASTIC_USER_NAME}"
password => "${ELASTIC_PASSWORD}"
ilm_enabled => false
}
}
}
I have a configuration file as shown above. The database query for the tags set as styling(tags => "styling") is not inserting all the data into the index. I have manually executed the query in a db client and the result count is correct as we expected. However on running the query in logstash whole data is not inserted/missed. I have no idea why.
There is above 900000 rows of data for that query. Close to 20000 rows are not updated in elastic index.
What should be the ideal cpu and memory spec for logstash and Elasticsearch? As of now we are using 2GB CPU 4GB memory for logstash and 4GB CPU and 16 GB RAM for Elasticsearch. Could this be causing issue?
I checked connecting to the RDS db from the local logstash and inserting into a local elastic index, it seems to load all the data correctly
Any help is appreciated