Loading large table from Mysql to ES via Logstash

Hello,

I have multiple large tables (each table contains ~15M records and consumes ~70GB of data).

My logstash configuration is as given below.

input {
        jdbc {
            jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/database?useSSL=false"
            jdbc_user => "root"
            jdbc_password => "root"
            jdbc_validate_connection => true
            jdbc_driver_library => "/Users/admin/Downloads/logstash-6.5.4/logstash-core/lib/mysql-connector-java-6.0.5.jar"
            jdbc_driver_class => "com.mysql.jdbc.Driver"
            statement => "select id, updated_at, updated_column1, updated_column2 from database.table where isUpdated = 1"
            jdbc_paging_enabled => true
            jdbc_page_size => "5000"
          }
}
filter {
        mutate {
                add_field => { "status" => "add"}
        }
}
output {
        elasticsearch {
                index => "index_123"
                document_type => "document"
                user => "elastic"
                password => "pass"
                hosts => "localhost:9200"
                document_id => "%{id}"
                action => "update"
                doc_as_upsert => "true"
        }
}
  

For each table logstash takes 4-5 days to log data from MySql to ES. Initially it runs fast but as it goes down in the table, it takes longer and longer for each batch of data.

Initially:


(0.762422s) SELECT * FROM (select id, updated_at, updated_column1, updated_column2 from database.table where isUpdated = 1) AS `t1` LIMIT 5000 OFFSET 0

(0.865474s) SELECT * FROM (select id, updated_at, updated_column1, updated_column2 from database.table where isUpdated = 1) AS `t1` LIMIT 5000 OFFSET 5000

(0.869017s) SELECT * FROM (select id, updated_at, updated_column1, updated_column2 from database.table where isUpdated = 1) AS `t1` LIMIT 5000 OFFSET 10000

After few million rows:


(582.38830s) SELECT * FROM (select id, updated_at, updated_column1, updated_column2 from database.table where isUpdated = 1) AS `t1` LIMIT 5000 OFFSET 9365000

(580.187553s) SELECT * FROM (select id, updated_at, updated_column1, updated_column2 from database.table where isUpdated = 1) AS `t1` LIMIT 5000 OFFSET 9370000

(586.855813s) SELECT * FROM (select id, updated_at, updated_column1, updated_column2 from database.table where isUpdated = 1) AS `t1` LIMIT 5000 OFFSET 9375000

I have a use case where I want to log updated columns of a table, frquently.

I have tried sql_last_value feature with tracking_column and tracking_column_type.

input {
  jdbc {
      jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/database?useSSL=false"
      jdbc_user => "root"
      jdbc_password => "root"
      jdbc_validate_connection => true
      jdbc_driver_library => "/Users/admin/Downloads/logstash-6.5.4/logstash-core/lib/mysql-connector-java-6.0.5.jar"
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      statement => "select id, updated_at, updated_column1, updated_column2 from database.table where isUpdated = 1 and id > :sql_last_value"
      jdbc_paging_enabled => true
      jdbc_page_size => "5000"
      use_column_value => true
      tracking_column => "id"
      tracking_column_type => "numeric"
    }
}
filter {
  mutate {
          add_field => { "status" => "add"}
  }
}
output {
  elasticsearch {
          index => "index_123"
          document_type => "document"
          user => "elastic"
          password => "pass"
          hosts => "localhost:9200"
          document_id => "%{id}"
          action => "update"
          doc_as_upsert => "true"
  }
}

But it stays id > 0 for each query.

SELECT * FROM (select id, updated_at, updated_column1, updated_column2 from database.table where isUpdated = 1 and id > 0) AS `t1` LIMIT 5000 OFFSET 0

SELECT * FROM (select id, updated_at, updated_column1, updated_column2 from database.table where isUpdated = 1 and id > 0) AS `t1` LIMIT 5000 OFFSET 5000

SELECT * FROM (select id, updated_at, updated_column1, updated_column2 from database.table where isUpdated = 1 and id > 0) AS `t1` LIMIT 5000 OFFSET 10000

As per testing I found that last value will get stored and used in next pipeline process.

But to optimize logstash process, I want something like this,


SELECT * FROM (select id, updated_at, updated_column1, updated_column2 from database.table where isUpdated = 1 and id > 0) AS `t1` LIMIT 5000

SELECT * FROM (select id, updated_at, updated_column1, updated_column2 from database.table where isUpdated = 1 and id > 5000) AS `t1` LIMIT 5000

SELECT * FROM (select id, updated_at, updated_column1, updated_column2 from database.table where isUpdated = 1 and id > 10000) AS `t1` LIMIT 5000

Because I think dynamic where clause ( id > lastId ) would optimize process.

Any help would be highly appriciated !

I'm not a jdbc user, so there is some speculation embedded here, but I will offer it as a line of investigation for you.

Your basic problem here appears to be that each time you execute the query you are fetching 9.4 million rows from the table and then throwing away almost all of them away. Then you are doing that again for the next 5000 rows. That takes time.

Any solution that involves a huge OFFSET value will be slow -- paging will not work well with this result set.

If 'id' is a key with which you can order all the rows and use as the tracking column, then you may be able to optimize this.

Schedule the input to run once per second. That means that the input will run the query to fetch a bunch of rows, then quickly start the next iteration.

I don't speak SQL, so I don't know how to write the query, but in English what you want is to do a select with 'ORDER BY id' from the entire table, and from that select 'id > :sql_last_value' with 'LIMIT 5000'. This effectively implements paging in a different way.

Provided that the index fits in the buffer cache that should perform reasonably well. If the index does not fit it will thrash the cache and performance will go to hell in a handbasket.

@Badger thanks for replying :slightly_smiling_face:

Absolutly, that is the behaviour of logstash-jdbc when you give query-statement with paging enabled. If logstash has a feature to assign dynamic value at every query while paging (Just like what it does with offset value), this type of use case would get benefit.

I tried the suggested solution and it's a nice solution to keep it logging latest data. It can be done by configuring jdbc-input like this (You may not need limit in this method):

        jdbc {
            jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/database?useSSL=false"
            jdbc_user => "root"
            jdbc_password => "root"
            jdbc_validate_connection => true
            jdbc_driver_library => "/Users/admin/Downloads/logstash-6.5.4/logstash-core/lib/mysql-connector-java-6.0.5.jar"
            jdbc_driver_class => "com.mysql.jdbc.Driver"
            statement => "select id, updated_at, updated_column1, updated_column2 from database.table where isUpdated = 1 and id > :sql_last_value LIMIT 5000"
            schedule => "* * * * *"
            use_column_value => true
            tracking_column => "id"
            tracking_column_type => "numeric"
            last_run_metadata_path => "./sql_last_value.yml"
          }

But If someone has a data pipeline processes (like me) including logstash being one of them then it's not the solution. It has following possible issues.

  1. Scheduling can't be done at every second. Minimum cron interval can be 60s only.
  2. Second thing is that it's cron job that will trigger process at every interval. So you don't know exactly when logstash process is completed after which you may want to kill it and move onto next process.

I have implemented a solution, If someone faces the situation just like me.

You can invoke logstash for batches of data, synchronously.

  jdbc {
            jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/database?useSSL=false"
            jdbc_user => "root"
            jdbc_password => "root"
            jdbc_validate_connection => true
            jdbc_driver_library => "/Users/admin/Downloads/logstash-6.5.4/logstash-core/lib/mysql-connector-java-6.0.5.jar"
            jdbc_driver_class => "com.mysql.jdbc.Driver"
            statement => "select id, updated_at, updated_column1, updated_column2 from database.table where isUpdated = 1 and id >= firstIdOfBatch and id <= lastIdOfBatch"
            jdbc_paging_enabled => true
            jdbc_page_size => "5000"
          }

The key being here is where clause:

where isUpdated = 1 and id >= firstIdOfBatch and id <= lastIdOfBatch;

Here you may need to decide optimum size of data-batch and jdbc_page_size.

I have kept jdbc_page_size: 5000 and data-batch is 0.2M.

The problem with this solution is the cost of time and CPU of invoking logstash multiple times. But for now I have saved my time with this solution compared to original setting. Now instead of taking 4-5 days for one table, I am able to complete logstash process in 3-4 hours.

Still the best way would be to be able to have query parameter dynamically like this:


SELECT * FROM (select [COLUMNS] from database.table where id > 0) AS `t1` LIMIT 5000
SELECT * FROM (select [COLUMNS] from database.table where id > 5000) AS `t1` LIMIT 5000
SELECT * FROM (select [COLUMNS] from database.table where id > 10000) AS `t1` LIMIT 5000

Not so. The schedule option on the jdbc input uses the rufus scheduler, and when parsing cronlines if there are six fields the first one is treated as seconds. A single logstash process will execute the query as often as the cronline tells it to, the process is never complete.

Ohh, my bad. I didn't check if rufus schedular offers this facility.

Hi welcome to the forum,

Without going in to lot more detail. I have done this process. somethink like this.

database has multimillion records and this is what we do initially

for first load of data we use some period (three month in my case) until we are up-to-date.
We run this logstash from command line

SQL we use.
select x,y,z from database where update_time > x-date and update_time < y-date ( three month, then next three month and so on)

once all data are pulled till today then logstash goes in pipleline and ingest data every 30 min

select x,y,z from database where update_time > sysdate-interval '30' minute

In this process we are not relying on sql_last_value or tracking_column. But all happens by updated_time column in database table. because every record when updates also update this column.