Here, I have MySQL server running in timezone America/Chicago
(CDT/CST). I have verified this with following sql query
SELECT @@global.time_zone, @@session.time_zone;
My simple table looks like below:
SELECT * FROM brand ORDER BY updated_ts;
I have a logstash docker image 6.4.0 (Container is running in UTC) with following configuration details:
###
# Configuration file for brand ingestion
#
# Directory structure of logstash for docker is mentioned here.
# https://www.elastic.co/guide/en/logstash/current/dir-layout.html#docker-layout
###
input {
jdbc {
# JDBC driver location
jdbc_driver_library => "/usr/share/logstash/bin/mysql-connector-java-8.0.12.jar"
# JDBC driver - updated one- use this
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
# DB connection string
# Note: If logstash running system timezone is different than MySQL server timezone,
# then please update it as per MySQL server timezone
# SELECT @@global.time_zone, @@session.time_zone;
jdbc_connection_string => "jdbc:mysql://SERVER:3306/SCHEMA?serverTimezone=America/Chicago&useSSL=false"
jdbc_user => "USER"
jdbc_password => "XXXX"
# If true, then will not persist previous execution state. Setting false as we will have updates also
clean_run => false
# Set the timezone
# Verify the timezone of the MySQL server and update here if necessary. This will be crucial in case of
# recording the last run time.
jdbc_default_timezone => "America/Chicago"
# schedule
# TODO: update the schedule, currently every five minutes for testing
schedule => "5 * * * * *"
# SQL
# Note: Do not end the sql with separator(;)
# Specify the ORDER BY "tracking_column", if record_last_run is true to record the correct time
statement => "SELECT
b.brand_id AS 'brand_id',
b.brand_name AS 'brand_name',
b.brand_logo AS 'brand_logo',
b.updated_ts as 'updated_ts'
FROM
brand b
WHERE b.updated_ts > :sql_last_value
ORDER BY updated_ts, b.brand_id ASC"
# Specify the fetch size, if not provided jdbc driver default is used
jdbc_fetch_size => 1000
# Need to set to use the specified column as an identifier of the persistent state
use_column_value => true
# Name of the column to track previous exexution or persistence
# Need to take the most updated one
tracking_column => "updated_ts"
# Type of the tracking column - very important!
tracking_column_type => "timestamp"
# It will record or store each running state in 'last_run_metadata_path'
record_last_run => true
# The place to keep the tracks, lets use data directory
last_run_metadata_path => "/usr/share/logstash/data/logstash_jdbc_last_run_ingest_brand_01.txt"
}
}
filter {
# Trim leading and trailing blank spaces
mutate {
strip => ["brand_name"]
}
}
output {
stdout {
codec => rubydebug {
metadata => true
}
}
}
First time the query gets fired correctly and fetch required records as below
SELECT
b.brand_id AS 'brand_id',
b.brand_name AS 'brand_name',
b.brand_logo AS 'brand_logo',
b.updated_ts as 'updated_ts'
FROM
brand b
WHERE b.updated_ts > '1969-12-31 18:00:00'
ORDER BY updated_ts, b.brand_id ASC
Now, the last run time gets recorded as
--- !ruby/object:DateTime '2018-08-29 10:16:16.000000000 -05:00'
Looks like this is incorrect, this should be '2018-08-29 5:16:16.000000000 -05:00'
NOTE: here I will be loosing records that will get updated during '2018-08-29 5:16:16.000000000 CDT' to '2018-08-29 10:16:16.000000000 CDT'
Next time queries are getting executed as below:
SELECT
b.brand_id AS 'brand_id',
b.brand_name AS 'brand_name',
b.brand_logo AS 'brand_logo',
b.updated_ts as 'updated_ts'
FROM
brand b
WHERE b.updated_ts > '2018-08-29 10:16:16'
Can you please suggest what I doing incorrect here?