This is the case:
I want to import data use logstash from mysql to elasticsearch,the subobject data is duplicated.How can I filter the data?
This is the error data in elasticsearch:
error json:
[{
"_index": "user",
"_type": "user",
"_id": "2",
"_score": 1,
"_source": {
"password": "1111",
"types": [{
"name": "111",
"id": 1
},
{
"name": "333",
"id": 3
}
],
"@timestamp": "2018-02-06T18:05:00.669Z",
"roles": [{
"name": "vip用户",
"id": 2
},
{
"name": "vip用户",
"id": 2
}
],
"@version": "1",
"id": 2,
"username": "vip"
}
},
{
"_index": "user",
"_type": "user",
"_id": "1",
"_score": 1,
"_source": {
"password": "1111",
"types": [{
"name": "111",
"id": 1
},
{
"name": "111",
"id": 1
},
{
"name": "111",
"id": 1
},
{
"name": "222",
"id": 2
},
{
"name": "222",
"id": 2
},
{
"name": "222",
"id": 2
},
{
"name": "333",
"id": 3
},
{
"name": "333",
"id": 3
},
{
"name": "333",
"id": 3
}
],
"@timestamp": "2018-02-06T18:05:00.668Z",
"roles": [{
"name": "管理员",
"id": 1
},
{
"name": "vip用户",
"id": 2
},
{
"name": "普通用户",
"id": 3
},
{
"name": "管理员",
"id": 1
},
{
"name": "vip用户",
"id": 2
},
{
"name": "普通用户",
"id": 3
},
{
"name": "管理员",
"id": 1
},
{
"name": "vip用户",
"id": 2
},
{
"name": "普通用户",
"id": 3
}
],
"@version": "1",
"id": 1,
"username": "admin"
}
}
]
"types" and "roles" is subobjects of user,it's duplicated.
This is the configuration file when start logstash: I use logstash-filter-aggregate plugin, but I don't know the right way to use is correct.
input {
jdbc {
jdbc_driver_library => "/Users/wangbin/src/test/logstash-es-mysql/docker/logstash/jar/mysql-connector-java-5.1.45.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/testes?useUnicode=true&characterEncoding=utf8&useSSL=false&autoReconnect=true&createDatabaseIfNotExist=true"
jdbc_user => "root"
jdbc_password => "root"
jdbc_default_timezone => "Asia/Shanghai"
jdbc_paging_enabled => true
jdbc_page_size => 100000
jdbc_fetch_size => 10000
connection_retry_attempts => 3
connection_retry_attempts_wait_time => 1
jdbc_pool_timeout => 5
lowercase_column_names => true
record_last_run => true
schedule => "* * * * *"
use_column_value => true
tracking_column => "id"
statement_filepath => "/Users/wangbin/src/test/logstash-es-mysql/docker/logstash/config/user.sql"
}
}
filter {
aggregate {
task_id => "%{id}"
code => "
map['id'] = event.get('id')
map['username'] = event.get('username')
map['password'] = event.get('password')
map['roles'] ||= []
map['roles'] << {
'id' => event.get('role_id'),
'name' => event.get('role_name')
}
map['types'] ||= []
map['types'] << {
'id' => event.get('type_id'),
'name' => event.get('type_name')
}
event.cancel()
"
push_previous_map_as_event => true
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "user"
document_type => "user"
document_id => "%{id}"
}
}
And the user.sql follows:
SELECT
u.id,
u.password,
u.username,
u.type,
r.id AS role_id,
r.name AS role_name,
t.id AS type_id,
t.name AS type_name
FROM user u
LEFT JOIN user_role ru ON ru.user_id = u.id
LEFT JOIN role r ON r.id = ru.role_id
LEFT JOIN user_type ut ON ut.user_id = u.id
LEFT JOIN type t ON t.id = ut.type_id