JDBC input/output with additional data transforming

Hello folks. I am pretty new in logstash. I have a task reload lot of data 5B records from mysql to another mysql with some aggregation. I need to load all fields param1, date group them and take count rows in main table and count rows in second dependent table.
I made it simple based on MySQL aggregation:

  input{
    jdbc {
          jdbc_connection_string =>"jdbc:mysql://host:3306/dbname"
          jdbc_user => "db_user"
          jdbc_password_filepath => "secret_path"
          jdbc_driver_library => "mysql-connector-java-5.1.48.jar"
          jdbc_driver_class => 'com.mysql.jdbc.Driver'
          lowercase_column_names => false
          schedule => '*/1 * * * *'
          statement => "select param1, date, id, count(items.id) as cnt_items from tbl1 join items using(param1) where id > :sql_last_value and date is not null order by id limit 50000"
          tracking_column_type => 'numeric'
          use_column_value => true
          tracking_column => 'id'
          last_run_metadata_path => "job_file"
    }
  }
  output{
    jdbc {
        driver_class => "com.mysql.jdbc.Driver"
        driver_jar_path => "mysql-connector-java-5.1.48.jar"
        connection_string => "jdbc:mysql://host2:3306/dbname2?user=${MYSQL_USER}&password=${MYSQL_PASSWORD}"
        statement => ["INSERT INTO report(param1, date, cnt_ids,cnt_items) values (?,1,?)
              on duplicate key update cnt_ids = cnt_ids + 1, cnt_items = cnt_items + values(cnt_items)",
              ""%{param1}","%{date}","%{cnt_items}"]
    }
  }

It worked correctly, but in PROD it's very very slow. With 1M batch it took 25-30 mins per batch, with 50k it took 2 mins but for amount 5B it will take forever.

I have 2 ideas:

  1. change input and take already aggregated data like param1, date, cnt_id, max(id), cnt_items. But it will produce higher load to source DB:
statement => "select param1, date, count(id) as cnt_ids, max(id) as tracking_column, count(items.id) as cnt_items from tbl1 join items using(param1) where id between :sql_last_value and :sql_last_value + BATCH and date is not null order by id group by 1,2"
  1. use aggregate filter, but I failed with it:
  input{
    jdbc {
          jdbc_connection_string =>"jdbc:mysql://host:3306/dbname"
          jdbc_user => "db_user"
          jdbc_password_filepath => "secret_path"
          jdbc_driver_library => "mysql-connector-java-5.1.48.jar"
          jdbc_driver_class => 'com.mysql.jdbc.Driver'
          lowercase_column_names => false
          schedule => '*/1 * * * *'
          statement => "select param1, date, id, count(items.id) as cnt_items from tbl1 join items using(param1) where id > :sql_last_value and date is not null order by id limit 50000"
          tracking_column_type => 'numeric'
          use_column_value => true
          tracking_column => 'id'
          last_run_metadata_path => "job_file"
    }
  }

  filter{
    aggregate {
       task_id => "%{param1}%{date}"
       code => "
          map['cnt_ids'] ||= 0
          map['cnt_ids'] += 1
          map['cnt_items'] ||= 0
          map['cnt_items'] += event.get('cnt_items')
        "
        push_map_as_event_on_timeout => true
        timeout => 30
     }
  }
  output{
    jdbc {
        driver_class => "com.mysql.jdbc.Driver"
        driver_jar_path => "mysql-connector-java-5.1.48.jar"
        connection_string => "jdbc:mysql://host2:3306/dbname2?user=${MYSQL_USER}&password=${MYSQL_PASSWORD}"
        statement => ["INSERT INTO report(param1,cnt_ids,cnt_items) values (?,?,?,?)
              on duplicate key update cnt_ids = cnt_ids +  values(cnt_ids), cnt_items = cnt_items + values(cnt_items)",
              ""%{param1}","%{date}","%{cnt_ids}","%{cnt_items}"]
    }
  }

it failed all the time with error like:

[2021-11-11T14:54:00,967][ERROR][logstash.outputs.jdbc    ][main][114270b357686994e97cab36f611f67cb91e4e00b53127aedbde778a59ea1044] JDBC - Exception. Not retrying 
{:exception=>java.sql.SQLException: Incorrect integer value: '%{cnt_ids}' for column 'cnt_ids' at row 1, 
:statement=>"INSERT INTO report(param1,cnt_ids,cnt_items) values (?,?,1,?)\n            on duplicate key update cnt_ids = cnt_ids + values(cnt_ids), cnt_items = cnt_items + values(cnt_items)", 
:event=>"{\"@version\":\"1\",\"param1\":5573736283261537267,\"date\":201306,\"@timestamp\":\"2021-11-11T14:54:00.612Z\",\"cnt_items\":2,\"id\":44771479}"}

Looks like final event doesn't contains new filed. But why?
Maybe I use wrong pattern and there are better solution for my task.

I have never done anything like this.
However when I see : INSERT INTO report(param1,cnt_ids,cnt_items) values (?,?,1,?)
you seem to supply 4 values of which the 3rd is a 1.

For the first implementation, it works. I don't think it's a good solution.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.