Loading large table from mysql into ES with logstash

Hi guys,

I have a large table in mysql which contains 58 millions of tweet records and it's size is continuously increasing as I am streaming from twitter. The schema of the table as follows:

CREATE TABLE IF NOT EXISTS `tweets` (
  `id` bigint(20) NOT NULL DEFAULT '0',
  `text` varchar(200) DEFAULT NULL,
  `lang` varchar(5) DEFAULT NULL,
  `followers` int(11) DEFAULT NULL,
  `username` varchar(50) DEFAULT NULL,
  `createdat` bigint(20) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=MyISAM DEFAULT CHARSET=utf8mb4;  

I want to load all this data (both 58M records and new streaming tweets) into ES by using jdbc plugin of logstash. I constructed my conf file as follows:

input {
  jdbc {
    jdbc_driver_library => "/home/inanc/ELK/logstashConfFiles/mysql-connector-java-5.1.40-bin.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/twitter"
    jdbc_user => "user"
    jdbc_password => "pass"
   # jdbc_fetch_size => 10000
    jdbc_paging_enabled => true
    jdbc_page_size => 200000
    schedule => "* * * * *"
    statement => "select id,text,lang,followers,username,from_unixtime(createdat/1000) as created_at from tweets WHERE id > :sql_last_value"
    use_column_value => true
    tracking_column => id
    #tracking_column_type => "numeric"
    #clean_run => true
    #last_run_metadata_path => "/home/inanc/ELK/logstashConfFiles/.logstash_jdbc_last_run"
  }
}
output{
        #stdout { codec => json_lines }
        elasticsearch{
                index => "twitter"
                document_type => "tweets"
                document_id => "%{id}"
                hosts => "localhost"
        }
}

It seems working, however it is very slow. The query times had initially taken 1 second but it continuously increased and now I imported about 31M records and it takes much longer as follows:

[2016-12-02T23:27:37,093][INFO ][logstash.inputs.jdbc     ] (37.832000s) SELECT * FROM (select id,text,lang,followers,username,from_unixtime(createdat/1000) as created_at from tweets WHERE id > 0) AS `t1` LIMIT 200000 OFFSET 31000000
[2016-12-02T23:28:31,366][INFO ][logstash.inputs.jdbc     ] (38.131000s) SELECT * FROM (select id,text,lang,followers,username,from_unixtime(createdat/1000) as created_at from tweets WHERE id > 0) AS `t1` LIMIT 200000 OFFSET 31200000
[2016-12-02T23:29:26,204][INFO ][logstash.inputs.jdbc     ] (38.305000s) SELECT * FROM (select id,text,lang,followers,username,from_unixtime(createdat/1000) as created_at from tweets WHERE id > 0) AS `t1` LIMIT 200000 OFFSET 31400000

As you will see :sql_last_value seems not working and so using large offset makes the time really long. Maybe it will work after it completes all table and waits new coming tweets. Is there a way to make it faster?

I am using ES 5.0.2 and Logstash 5.0.2

Thanks,

Okay guys, as I expected :last_sql_value started to update itself after it loaded all 58M data. I had taken some time to load 58M data due to offset thing, but now it completely works fine and fast for new streaming tweets.

By the way, I don't know why but, there are some missing documents. There are 59.999.412 rows in mysql, but there are 59.999.252 documents in ES. Maybe some of the documents are not parsing correctly, or :sql_last_value is missing some thing.

If ES rejected one or more documents (e.g. because of a mapping conflict; totally possible) then there should be something in the log about it.

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.