Hello,
I have multiple large tables (each table contains ~15M records and consumes ~70GB of data).
My logstash configuration is as given below.
input {
jdbc {
jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/database?useSSL=false"
jdbc_user => "root"
jdbc_password => "root"
jdbc_validate_connection => true
jdbc_driver_library => "/Users/admin/Downloads/logstash-6.5.4/logstash-core/lib/mysql-connector-java-6.0.5.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
statement => "select id, updated_at, updated_column1, updated_column2 from database.table where isUpdated = 1"
jdbc_paging_enabled => true
jdbc_page_size => "5000"
}
}
filter {
mutate {
add_field => { "status" => "add"}
}
}
output {
elasticsearch {
index => "index_123"
document_type => "document"
user => "elastic"
password => "pass"
hosts => "localhost:9200"
document_id => "%{id}"
action => "update"
doc_as_upsert => "true"
}
}
For each table logstash takes 4-5 days to log data from MySql to ES. Initially it runs fast but as it goes down in the table, it takes longer and longer for each batch of data.
Initially:
(0.762422s) SELECT * FROM (select id, updated_at, updated_column1, updated_column2 from database.table where isUpdated = 1) AS `t1` LIMIT 5000 OFFSET 0
(0.865474s) SELECT * FROM (select id, updated_at, updated_column1, updated_column2 from database.table where isUpdated = 1) AS `t1` LIMIT 5000 OFFSET 5000
(0.869017s) SELECT * FROM (select id, updated_at, updated_column1, updated_column2 from database.table where isUpdated = 1) AS `t1` LIMIT 5000 OFFSET 10000
After few million rows:
(582.38830s) SELECT * FROM (select id, updated_at, updated_column1, updated_column2 from database.table where isUpdated = 1) AS `t1` LIMIT 5000 OFFSET 9365000
(580.187553s) SELECT * FROM (select id, updated_at, updated_column1, updated_column2 from database.table where isUpdated = 1) AS `t1` LIMIT 5000 OFFSET 9370000
(586.855813s) SELECT * FROM (select id, updated_at, updated_column1, updated_column2 from database.table where isUpdated = 1) AS `t1` LIMIT 5000 OFFSET 9375000
I have a use case where I want to log updated columns of a table, frquently.
I have tried sql_last_value feature with tracking_column and tracking_column_type.
input {
jdbc {
jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/database?useSSL=false"
jdbc_user => "root"
jdbc_password => "root"
jdbc_validate_connection => true
jdbc_driver_library => "/Users/admin/Downloads/logstash-6.5.4/logstash-core/lib/mysql-connector-java-6.0.5.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
statement => "select id, updated_at, updated_column1, updated_column2 from database.table where isUpdated = 1 and id > :sql_last_value"
jdbc_paging_enabled => true
jdbc_page_size => "5000"
use_column_value => true
tracking_column => "id"
tracking_column_type => "numeric"
}
}
filter {
mutate {
add_field => { "status" => "add"}
}
}
output {
elasticsearch {
index => "index_123"
document_type => "document"
user => "elastic"
password => "pass"
hosts => "localhost:9200"
document_id => "%{id}"
action => "update"
doc_as_upsert => "true"
}
}
But it stays id > 0 for each query.
SELECT * FROM (select id, updated_at, updated_column1, updated_column2 from database.table where isUpdated = 1 and id > 0) AS `t1` LIMIT 5000 OFFSET 0
SELECT * FROM (select id, updated_at, updated_column1, updated_column2 from database.table where isUpdated = 1 and id > 0) AS `t1` LIMIT 5000 OFFSET 5000
SELECT * FROM (select id, updated_at, updated_column1, updated_column2 from database.table where isUpdated = 1 and id > 0) AS `t1` LIMIT 5000 OFFSET 10000
As per testing I found that last value will get stored and used in next pipeline process.
But to optimize logstash process, I want something like this,
SELECT * FROM (select id, updated_at, updated_column1, updated_column2 from database.table where isUpdated = 1 and id > 0) AS `t1` LIMIT 5000
SELECT * FROM (select id, updated_at, updated_column1, updated_column2 from database.table where isUpdated = 1 and id > 5000) AS `t1` LIMIT 5000
SELECT * FROM (select id, updated_at, updated_column1, updated_column2 from database.table where isUpdated = 1 and id > 10000) AS `t1` LIMIT 5000
Because I think dynamic where clause ( id > lastId ) would optimize process.
Any help would be highly appriciated !