Logstash synchronize mysql data to elastic

Hello experts:
My software version (es-6.5.1, logstash-6.5.1, mysql5.7)
Recently, I encountered a problem when using logstash to synchronize mysql data to elastic. Synchronize data from the same library in mysql. After multi-table synchronization to es, the time difference of the date_tine type field in mysql is different, such as table. The add_time in 1 will increase by 6h in es, and the update_time in table 2 will increase by 14h in es. There is another case where I synchronize the same data in one table, the time difference of the first execution synchronization and the second execution synchronization. The time difference is inconsistent. What is going on? Is there any instability factor in logstash? I don’t have a clue at present, please advise.
Below I list a few of my logstash synchronization profiles:
1.conf
input{
stdin{
}
jdbc {
# 数据库
jdbc_connection_string => "jdbc:mysql://xxx:xxx:xxx:xxx:93306/test?useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull"
jdbc_user => "test"
jdbc_password => "1"
# mysql驱动解压的位置
jdbc_driver_library => "/elk/logstash-6.5.1/logstash-core/lib/jars/mysql-connector-java-8.0.13.jar"
# mysql驱动类
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
#use_column_value => true
#tracking_column => id
record_last_run => true
#last_run_metadata_path => "/elk/logstash-6.5.1/bin/config_mysql/flag/axh_flag/axh_user.txt"
#可以使用sql文件的方式
#statement_filepath => "/home/elk1/software/logstash-6.5.3/bin/config-mysql/axh_user.sql"
#要同步的表
statement => "select id,uid,add_time from axh_active_user where add_time like '2019-02-18%'"
schedule => "04 17 * * "
#索引type
type => "axh_active_user"
#索引设置时区
# jdbc_default_timezone => "Asia/Shanghai"
}
}
filter {
ruby {
code =>"event.set('new_date', event.get('@timestamp').time.localtime + 8
60*60)"
}
ruby {
code => "event.set('@timestamp',event.get('new_date'))"
}
mutate {
remove_field => ["new_date"]
}
date {
match => [ "add_time", "UNIX_MS"]
target => "@timestamp"
}
}
output {
elasticsearch {
hosts => "xxx:xxx:xxx:xxx:xx"
#索引名称
index => "test_time_new"
document_id => "%{id}"
user => "elastic"
password => "123"
#输出的索引type此处表示上面的tag
"document_type" => "%{type}"
}
stdout {
codec => rubydebug
}
}

2.conf

input {
stdin{
}
jdbc {
# 数据库
jdbc_connection_string => "jdbc:mysql://xxx:xxx:xxx:xxx:93306/test?useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull"
jdbc_user => "test"
jdbc_password => "1"
# mysql驱动解压的位置
jdbc_driver_library => "/elk/logstash-6.5.1/logstash-core/lib/jars/mysql-connector-java-8.0.13.jar"
# mysql驱动类
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
#use_column_value => true
#tracking_column => id
record_last_run => true
#last_run_metadata_path => "/elk/logstash-6.5.1/bin/config_mysql/flag/axh_flag/axh_user.txt"
#可以使用sql文件的方式
#statement_filepath => "/home/elk1/software/logstash-6.5.3/bin/config-mysql/axh_user.sql"
#要同步的表
statement => "select id, reg_time,date_add(reg_time,INTERVAL -14 HOUR) as real_reg_time,tg_status1 as tg_status,mobile,tg_id,is_vip1 as is_vip,date_add(vip_start_time,INTERVAL -14 HOUR) as vip_start_time,is_login*1 as is_login,date_add(last_login_time,INTERVAL -14 HOUR) as last_login_time from axh_user where id>=1432356"
schedule => "23 14 * * *"
#索引type
type => "axh_user"
#索引设置时区
# jdbc_default_timezone => "Asia/Shanghai"
}
}
filter {
ruby {
code => "event.timestamp.time.localtime"
}
}
output {
elasticsearch {
hosts => "xxx:xxx:xxx:xxx:xx"
#索引名称
index => "test_time"
document_id => "%{id}"
user => "elastic"
password => "123"
#输出的索引type此处表示上面的tag
"document_type" => "%{type}"
}
stdout {
codec => json_lines
}
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.