Optimize logstash configuration

Hello. I'm syncing 4 tables of geo data from postgres to elasticsearch using logstash. I am updating the data to be changed using the doc_as_upsert option.

One table has more than 700,000 rows.

Not many, but sometimes there are corrections. When users edit postgres one by one, it would be good to reflect this modification in elasticsearch immediately.

With the existing settings, more than half of the memory usage remains, and the cpu usage rate is around 4%.

How can I quickly synchronize the pg table and the es index? I mean Is this the best setting for occasional upsert situations?

intput {
    jdbc {
        jdbc_connection_string => "jdbc:postgresql://아이피:디비_포트넘버/atlasdb?useTimezone=true&useLegacyDatetimeCode=false&serverTimezone=UTC&useSSL=false&useUnicode=true&characterEncoding=utf8"
        jdbc_user => "atlas"
        jdbc_password => "*"
        jdbc_validate_connection => true
        jdbc_driver_library => "/lib/postgresql-42.2.12.jar"
        jdbc_driver_class => "org.postgresql.Driver"
        schedule => "*/10 * * * *"
        statement => "SELECT region_id, region_type, country_code, country_code3, continent_code, source_from st_asgeojson(center_geo_point)::text as center_geo_point_text, center_longitude, center_latitude, jsonn::text from expedia_region_union order by region_id asc"
        jdbc_paging_enabled => true
        jdbc_page_size => "100000"
        tags => ["expedia_region_union"]
    }  
     
    jdbc {
        jdbc_connection_string => "jdbc:postgresql://아이피:디비_포트넘버/atlasdb?useTimezone=true&useLegacyDatetimeCode=false&serverTimezone=UTC&useSSL=false&useUnicode=true&characterEncoding=utf8"
        jdbc_user => "atlas"
        jdbc_password => "*"
        jdbc_validate_connection => true
        jdbc_driver_library => "/lib/postgresql-42.2.12.jar"
        jdbc_driver_class => "org.postgresql.Driver"
        schedule => "*/10 * * * *"
        statement => "SELECT id, iata_airport_code, name, name_full, country_code, center_latitude, center_longitude, name_kr as name_korean, name_full_kr as name_korean_full, st_asgeojson(center_geo_point)::text as center_geo_point_text, la, lo, public_flag, international_flag, source_from, source_time, iata_airport_metro_code, (select json_build_object('region_id', region_id, 'region_name', region_name, 'region_name_kr', region_name_kr, 'region_type', region_type, 'region_code', region_code)::text from expedia_region_union where region_id = c.region_id) as haha_test from expedia_airport_more c"
        jdbc_paging_enabled => true
        jdbc_page_size => "100000"
        tags => ["expedia_airport_more"]
    } 
 
 
    jdbc {
        jdbc_connection_string => "jdbc:postgresql://아이피:디비_포트넘버/atlasdb?useTimezone=true&useLegacyDatetimeCode=false&serverTimezone=UTC&useSSL=false&useUnicode=true&characterEncoding=utf8"
        jdbc_user => "atlas"
        jdbc_password => "*"
        jdbc_validate_connection => true
        jdbc_driver_library => "/lib/postgresql-42.2.12.jar"
        jdbc_driver_class => "org.postgresql.Driver"
        schedule => "*/10 * * * *"
        statement => "SELECT source_from, region_name_kr as region_name_korean, region_id, region_name_full, continent_code, region_type, region_level descendants::text as descenants_text from expedia_region_continent"
        jdbc_paging_enabled => true
        jdbc_page_size => "100000"
        tags => ["expedia_region_continent"]
    } 
    stdin { codec => plain { charset => "UTF-8"} }
}

Now, after 2 hours, the fix will show up in ES.

Hi,

Maybe you could have a a delta running in a shorter interval?
Add a column to the table containing the date of the last change.
Add the column to the select statement.
Add the column as a condition to the statement(the variable is called sql_last_value).
Configure LogStash to track the last value with tracking_column,use_column_value and last_run_metadata_path .

This might look like the following:

jdbc {
        jdbc_connection_string => "jdbc:postgresql://아이피:디비_포트넘버/atlasdb?useTimezone=true&useLegacyDatetimeCode=false&serverTimezone=UTC&useSSL=false&useUnicode=true&characterEncoding=utf8"
        jdbc_user => "atlas"
        jdbc_password => "*"
        jdbc_validate_connection => true
        jdbc_driver_library => "/lib/postgresql-42.2.12.jar"
        jdbc_driver_class => "org.postgresql.Driver"
        schedule => "* * * * *"
        statement => "SELECT last_modified, id, iata_airport_code, name, name_full, country_code, center_latitude, center_longitude, name_kr as name_korean, name_full_kr as name_korean_full, st_asgeojson(center_geo_point)::text as center_geo_point_text, la, lo, public_flag, international_flag, source_from, source_time, iata_airport_metro_code, (select json_build_object('region_id', region_id, 'region_name', region_name, 'region_name_kr', region_name_kr, 'region_type', region_type, 'region_code', region_code)::text from expedia_region_union where region_id = c.region_id) as haha_test from expedia_airport_more c WHERE last_modified> :sql_last_value"
        jdbc_paging_enabled => true
        jdbc_page_size => "100000"
        tags => ["expedia_airport_more"]
        tracking_column => "last_modified"
        tracking_column_type => "timestamp"
		use_column_value => true
		last_run_metadata_path => "/path/to/last_run_date"
    } 

Best regards
Wolfram

1 Like

Hello. Thanks for the reply. If I understand you correctly...

Should I configure the statement to get only the changed ones(after fetch the entire table once and then)?

I'll ask you one more thing.
How can sql_last_value be given a value?

Can you elaborate on the last_run_metadata_path and sql_last_value parts in more detail? I don't know how to set sql_last_value

I want to know the way
compare logstash last run date and pg latest value

If you have a large amount of data that would be better, yes.

sql_last_value is set automatically to the value of the last result of the ' tracking_column`.

Have a look here for details of the parameters: Jdbc input plugin | Logstash Reference [8.11] | Elastic

Best regards
Wolfram

Thank you for your timely response. it helped me a lot.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.