Hello folks. I am pretty new in logstash. I have a task reload lot of data 5B records from mysql to another mysql with some aggregation. I need to load all fields param1, date group them and take count rows in main table and count rows in second dependent table.
I made it simple based on MySQL aggregation:
input{
jdbc {
jdbc_connection_string =>"jdbc:mysql://host:3306/dbname"
jdbc_user => "db_user"
jdbc_password_filepath => "secret_path"
jdbc_driver_library => "mysql-connector-java-5.1.48.jar"
jdbc_driver_class => 'com.mysql.jdbc.Driver'
lowercase_column_names => false
schedule => '*/1 * * * *'
statement => "select param1, date, id, count(items.id) as cnt_items from tbl1 join items using(param1) where id > :sql_last_value and date is not null order by id limit 50000"
tracking_column_type => 'numeric'
use_column_value => true
tracking_column => 'id'
last_run_metadata_path => "job_file"
}
}
output{
jdbc {
driver_class => "com.mysql.jdbc.Driver"
driver_jar_path => "mysql-connector-java-5.1.48.jar"
connection_string => "jdbc:mysql://host2:3306/dbname2?user=${MYSQL_USER}&password=${MYSQL_PASSWORD}"
statement => ["INSERT INTO report(param1, date, cnt_ids,cnt_items) values (?,1,?)
on duplicate key update cnt_ids = cnt_ids + 1, cnt_items = cnt_items + values(cnt_items)",
""%{param1}","%{date}","%{cnt_items}"]
}
}
It worked correctly, but in PROD it's very very slow. With 1M batch it took 25-30 mins per batch, with 50k it took 2 mins but for amount 5B it will take forever.
I have 2 ideas:
- change input and take already aggregated data like param1, date, cnt_id, max(id), cnt_items. But it will produce higher load to source DB:
statement => "select param1, date, count(id) as cnt_ids, max(id) as tracking_column, count(items.id) as cnt_items from tbl1 join items using(param1) where id between :sql_last_value and :sql_last_value + BATCH and date is not null order by id group by 1,2"
- use aggregate filter, but I failed with it:
input{
jdbc {
jdbc_connection_string =>"jdbc:mysql://host:3306/dbname"
jdbc_user => "db_user"
jdbc_password_filepath => "secret_path"
jdbc_driver_library => "mysql-connector-java-5.1.48.jar"
jdbc_driver_class => 'com.mysql.jdbc.Driver'
lowercase_column_names => false
schedule => '*/1 * * * *'
statement => "select param1, date, id, count(items.id) as cnt_items from tbl1 join items using(param1) where id > :sql_last_value and date is not null order by id limit 50000"
tracking_column_type => 'numeric'
use_column_value => true
tracking_column => 'id'
last_run_metadata_path => "job_file"
}
}
filter{
aggregate {
task_id => "%{param1}%{date}"
code => "
map['cnt_ids'] ||= 0
map['cnt_ids'] += 1
map['cnt_items'] ||= 0
map['cnt_items'] += event.get('cnt_items')
"
push_map_as_event_on_timeout => true
timeout => 30
}
}
output{
jdbc {
driver_class => "com.mysql.jdbc.Driver"
driver_jar_path => "mysql-connector-java-5.1.48.jar"
connection_string => "jdbc:mysql://host2:3306/dbname2?user=${MYSQL_USER}&password=${MYSQL_PASSWORD}"
statement => ["INSERT INTO report(param1,cnt_ids,cnt_items) values (?,?,?,?)
on duplicate key update cnt_ids = cnt_ids + values(cnt_ids), cnt_items = cnt_items + values(cnt_items)",
""%{param1}","%{date}","%{cnt_ids}","%{cnt_items}"]
}
}
it failed all the time with error like:
[2021-11-11T14:54:00,967][ERROR][logstash.outputs.jdbc ][main][114270b357686994e97cab36f611f67cb91e4e00b53127aedbde778a59ea1044] JDBC - Exception. Not retrying
{:exception=>java.sql.SQLException: Incorrect integer value: '%{cnt_ids}' for column 'cnt_ids' at row 1,
:statement=>"INSERT INTO report(param1,cnt_ids,cnt_items) values (?,?,1,?)\n on duplicate key update cnt_ids = cnt_ids + values(cnt_ids), cnt_items = cnt_items + values(cnt_items)",
:event=>"{\"@version\":\"1\",\"param1\":5573736283261537267,\"date\":201306,\"@timestamp\":\"2021-11-11T14:54:00.612Z\",\"cnt_items\":2,\"id\":44771479}"}
Looks like final event doesn't contains new filed. But why?
Maybe I use wrong pattern and there are better solution for my task.