First of all I can't be sure that the jdbc_pool_timeout parameter has no effect.
I use logstash 7.16 to sync data from mysql 8.0 to elasitcsearch 7.16 cluster.
Because this database is a production environment, there are always services writing data to this table.
The synchronization is started normally, but the speed is relatively slow. There are 45 million pieces of complete data.
According to the log, the synchronization lasted for about 11 hours, and then logstash printed an exception and exited. I only saw 785,000 pieces of data on Elasticsearch.
The strange thing is that the log printed by logstash is "java.sql.SQLException: Connection timed out (Read failed)", I didn't show to increase the jdbc_pool_timeout parameter, but it defaults to 5s, the wait_timeout and interactive_timeout of mysql are 180 and 28800 respectively, if the reason It is a timeout, why is it not disconnected after 28800 seconds but only after 11 hours?
[2022-02-21T02:43:25,546][ERROR][logstash.inputs.jdbc ][iotmessages-1][15655e21190d8bdc4d7c0050406d0e7ece2f4e7bd9cf5cc6acb1638683125f28] Java::JavaSql::SQLException: Connection timed out (Read failed): SELECT * FROM iotmessages WHERE (CreationTime > '2022-02-01 00:00:00') AND CreationTime < NOW() LIMIT 5000 OFFSET 785000
[2022-02-21T02:43:25,558][WARN ][logstash.inputs.jdbc ][iotmessages-1][15655e21190d8bdc4d7c0050406d0e7ece2f4e7bd9cf5cc6acb1638683125f28] Exception when executing JDBC query {:exception=>Sequel::DatabaseError, :message=>"Java::JavaSql::SQLException: Connection timed out (Read failed)", :cause=>"java.sql.SQLException: Connection timed out (Read failed)", :backtrace=>["com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(com/mysql/cj/jdbc/exceptions/SQLError.java:129)", "com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(com/mysql/cj/jdbc/exceptions/SQLExceptionsMapping.java:122)", "com.mysql.cj.jdbc.StatementImpl.executeQuery(com/mysql/cj/jdbc/StatementImpl.java:1201)", "jdk.internal.reflect.GeneratedMethodAccessor66.invoke(jdk/internal/reflect/GeneratedMethodAccessor66)", "jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(jdk/internal/reflect/DelegatingMethodAccessorImpl.java:43)", "java.lang.reflect.Method.invoke(java/lang/reflect/Method.java:566)", "org.jruby.javasupport.JavaMethod.invokeDirectWithExceptionHandling(org/jruby/javasupport/JavaMethod.java:456)", "org.jruby.javasupport.JavaMethod.invokeDirect(org/jruby/javasupport/JavaMethod.java:317)", "usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.sequel_minus_5_dot_52_dot_0.lib.sequel.adapters.jdbc.execute(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/sequel-5.52.0/lib/sequel/adapters/jdbc.rb:266)", "usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.sequel_minus_5_dot_52_dot_0.lib.sequel.database.logging.log_connection_yield(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/sequel-5.52.0/lib/sequel/database/logging.rb:43)", "usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.sequel_minus_5_dot_52_dot_0.lib.sequel.adapters.jdbc.execute(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/sequel-5.52.0/lib/sequel/adapters/jdbc.rb:266)", "usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.sequel_minus_5_dot_52_dot_0.lib.sequel.adapters.jdbc.statement(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/sequel-5.52.0/lib/sequel/adapters/jdbc.rb:702)", "usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.sequel_minus_5_dot_52_dot_0.lib.sequel.adapters.jdbc.execute(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/sequel-5.52.0/lib/sequel/adapters/jdbc.rb:261)", "usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.sequel_minus_5_dot_52_dot_0.lib.sequel.connection_pool.threaded.hold(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/sequel-5.52.0/lib/sequel/connection_pool/threaded.rb:92)", "usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.sequel_minus_5_dot_52_dot_0.lib.sequel.database.connecting.synchronize(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/sequel-5.52.0/lib/sequel/database/connecting.rb:269)", "usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.sequel_minus_5_dot_52_dot_0.lib.sequel.adapters.jdbc.execute(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/sequel-5.52.0/lib/sequel/adapters/jdbc.rb:260)", "usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.sequel_minus_5_dot_52_dot_0.lib.sequel.dataset.actions.execute(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/sequel-5.52.0/lib/sequel/dataset/actions.rb:1093)", "usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.sequel_minus_5_dot_52_dot_0.lib.sequel.adapters.jdbc.fetch_rows(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/sequel-5.52.0/lib/sequel/adapters/jdbc.rb:753)", "usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.sequel_minus_5_dot_52_dot_0.lib.sequel.dataset.actions.each(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/sequel-5.52.0/lib/sequel/dataset/actions.rb:152)", "usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_jdbc_minus_5_dot_2_dot_2.lib.logstash.plugin_mixins.jdbc.statement_handler.perform_query(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-jdbc-5.2.2/lib/logstash/plugin_mixins/jdbc/statement_handler.rb:106)", "org.jruby.RubyKernel.loop(org/jruby/RubyKernel.java:1442)", "org.jruby.RubyKernel$INVOKER$s$0$0$loop.call(org/jruby/RubyKernel$INVOKER$s$0$0$loop.gen)", "usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_jdbc_minus_5_dot_2_dot_2.lib.logstash.plugin_mixins.jdbc.statement_handler.perform_query(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-jdbc-5.2.2/lib/logstash/plugin_mixins/jdbc/statement_handler.rb:104)", "usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_jdbc_minus_5_dot_2_dot_2.lib.logstash.plugin_mixins.jdbc.jdbc.execute_statement(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-jdbc-5.2.2/lib/logstash/plugin_mixins/jdbc/jdbc.rb:217)", "usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_jdbc_minus_5_dot_2_dot_2.lib.logstash.inputs.jdbc.execute_query(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-jdbc-5.2.2/lib/logstash/inputs/jdbc.rb:339)", "usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_jdbc_minus_5_dot_2_dot_2.lib.logstash.inputs.jdbc.run(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-jdbc-5.2.2/lib/logstash/inputs/jdbc.rb:307)", "usr.share.logstash.logstash_minus_core.lib.logstash.java_pipeline.inputworker(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:409)", "usr.share.logstash.logstash_minus_core.lib.logstash.java_pipeline.start_input(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:400)", "org.jruby.RubyProc.call(org/jruby/RubyProc.java:318)", "java.lang.Thread.run(java/lang/Thread.java:829)"]}
input {
jdbc {
jdbc_driver_library => "/etc/logstash/mysql-connector-java-8.0.28.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://XXXXXX"
jdbc_user => "xxxxx"
jdbc_password => "xxxxx"
jdbc_paging_enabled => "true"
jdbc_page_size => "5000"
jdbc_default_timezone => "Asia/Shanghai"
statement => "SELECT * FROM iotmessages WHERE (CreationTime > '2022-02-01 00:00:00') AND CreationTime < NOW() LIMIT :size OFFSET :offset"
record_last_run => "true"
use_column_value => "true"
tracking_column_type => "timestamp"
tracking_column => "CreationTime"
clean_run => "false"
jdbc_paging_mode => "explicit"
sql_log_level => "debug"
}
}
filter {
mutate {
copy => { "id" => "[@metadata][_id]"}
remove_field => ["id", "@version"]
}
}
output {
elasticsearch {
hosts => "192.168.10.28:9200"
ilm_rollover_alias => "iot-iotmessages"
ilm_policy => "365-days-default"
ilm_pattern => "{now/d}-000001"
ilm_enabled => "true"
template_name => "iotmessages"
template_overwrite => "true"
document_id => "%{[@metadata][_id]}"
}
}