Duplication of data using the aggregate plugin

The first time we used logstash to process data today, there was a problem of data duplication or loss.This is logstash's bug or I've got a mistake?
And my logstash.conf is:

input {
stdin {}
jdbc {
jdbc_connection_string => "jdbc:mysql://192.168.88.128:3306/gwbnsh_common"
jdbc_driver_library => "/usr/local/mysql-connector-java-5.1.7-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_user => "root"
jdbc_password => "123"
clean_run => false
last_run_metadata_path => "/usr/local/logstash-6.2.1/template/infos/system_district"
record_last_run => true
use_column_value => true
tracking_column => "id"
lowercase_column_names => false
jdbc_paging_enabled => true
jdbc_page_size => "50000"
#statement_filepath => "/usr/local/logstash-6.2.1/bin/logstash_mysql_sync/mysql_sync.sql"
statement => "SELECT
t1.ID id,
t1.NAME name,
t1.BOSS_CODE bossCode,
t1.NAMESPACE_CODE namespaceCode,
t1.CREATOR creator,
DATE_FORMAT(t1.CREATED_DATE, '%Y-%m-%d %H:%I:%S') createdDate,
t1.UPDATER updater,
DATE_FORMAT(t1.UPDATED_DATE, '%Y-%m-%d %H:%I:%S') updatedDate,
t1.IS_DEL isDel,
t2.ID id_s,
t2.NAME name_s,
t2.REGION_ID rid_s,
t3.MANGER m_s,
t2.REMARK r_s,
t2.BOSS_CODE b_s,
t2.NAMESPACE_CODE n_s,
t2.CREATOR c_s,
DATE_FORMAT(t2.CREATED_DATE, '%Y-%m-%d %H:%I:%S') cd_s,
t2.UPDATER u_s,
DATE_FORMAT(t2.UPDATED_DATE, '%Y-%m-%d %H:%I:%S') ud_s,
t2.IS_DEL i_s,
t3.ID id_f,
t3.NAME name_f,
t3.REGION_ID rid_f,
t3.MANGER m_f,
t3.REMARK r_f,
t3.BOSS_CODE b_f,
t3.NAMESPACE_CODE n_f,
t3.CREATOR c_f,
DATE_FORMAT(t3.CREATED_DATE, '%Y-%m-%d %H:%I:%S') cd_f,
t3.UPDATER u_f,
DATE_FORMAT(t3.UPDATED_DATE, '%Y-%m-%d %H:%I:%S') ud_f,
t3.IS_DEL i_f
FROM
system_district t1,
system_service_station t2,
system_fix_station t3
WHERE
t1.ID = t2.REGION_ID
AND t1.ID = t3.REGION_ID"
schedule => "* * * * *"
type => "system_district"
}
}
filter {
json {
source => "message"
remove_field => ["message"]
}
aggregate {
task_id => "%{id}"
code => "
map['id'] = event.get('id')
map['name'] = event.get('name')
map['bossCode'] = event.get('bossCode')
map['namespaceCode'] = event.get('namespaceCode')
map['creator'] = event.get('creator')
map['createdDate'] = event.get('createdDate')
map['updater'] = event.get('updater')
map['updatedDate'] = event.get('updatedDate')
map['isDel'] = event.get('isDel')
map['systemServiceStation'] ||=
map['systemServiceStation'] << {'id' => event.get('id_s'),'name' => event.get('name_s'),'regionId' => event.get('rid_s'),'manger' => event.get('m_s'),'remark' => event.get('r_s'),'bossCode' => event.get('b_s'),'namespaceCode' => event.get('n_s'),'creator' => event.get('c_s'),'createdDate' => event.get('cd_s'),'updater' => event.get('u_s'),'updatedDate' => event.get('ud_s'),'isDel' => event.get('i_s')}
map['systemFixStation'] ||=
map['systemFixStation'] << {'id' => event.get('id_f'),'name' => event.get('name_f'),'regionId' => event.get('rid_f'),'manger' => event.get('m_f'),'remark' => event.get('r_f'),'bossCode' => event.get('b_f'),'namespaceCode' => event.get('n_f'),'creator' => event.get('c_f'),'createdDate' => event.get('cd_f'),'updater' => event.get('u_f'),'updatedDate' => event.get('ud_f'),'isDel' => event.get('i_f')}
event.cancel()
"
push_previous_map_as_event => true
timeout => 3
}
mutate {
remove_field => [ "@timestamp", "@version", "tags"]
remove_tag => ["tags"]
}
}
output {
elasticsearch {
hosts => ["192.168.88.128:9200","192.168.88.129:9200"]
action => "index"
index => "system_district"
document_id=>"%{id}"
manage_template => true
template => "/usr/local/logstash-6.2.1/template/mappings/system_district.json"
template_name => "system_district.json"
template_overwrite => true
}
stdout {
codec => json_lines
}
}
I would be grateful if anyone could give me some advice.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.