Import data from mysql to elasticsearch using logstash ,and the subobject are duplicate?


(wangbin) #1

This is the case:
I want to import data use logstash from mysql to elasticsearch,the subobject data is duplicated.How can I filter the data?

This is the error data in elasticsearch:

error json:

[{
		"_index": "user",
		"_type": "user",
		"_id": "2",
		"_score": 1,
		"_source": {
			"password": "1111",
			"types": [{
					"name": "111",
					"id": 1
				},
				{
					"name": "333",
					"id": 3
				}
			],
			"@timestamp": "2018-02-06T18:05:00.669Z",
			"roles": [{
					"name": "vip用户",
					"id": 2
				},
				{
					"name": "vip用户",
					"id": 2
				}
			],
			"@version": "1",
			"id": 2,
			"username": "vip"
		}
	},
	{
		"_index": "user",
		"_type": "user",
		"_id": "1",
		"_score": 1,
		"_source": {
			"password": "1111",
			"types": [{
					"name": "111",
					"id": 1
				},
				{
					"name": "111",
					"id": 1
				},
				{
					"name": "111",
					"id": 1
				},
				{
					"name": "222",
					"id": 2
				},
				{
					"name": "222",
					"id": 2
				},
				{
					"name": "222",
					"id": 2
				},
				{
					"name": "333",
					"id": 3
				},
				{
					"name": "333",
					"id": 3
				},
				{
					"name": "333",
					"id": 3
				}
			],
			"@timestamp": "2018-02-06T18:05:00.668Z",
			"roles": [{
					"name": "管理员",
					"id": 1
				},
				{
					"name": "vip用户",
					"id": 2
				},
				{
					"name": "普通用户",
					"id": 3
				},
				{
					"name": "管理员",
					"id": 1
				},
				{
					"name": "vip用户",
					"id": 2
				},
				{
					"name": "普通用户",
					"id": 3
				},
				{
					"name": "管理员",
					"id": 1
				},
				{
					"name": "vip用户",
					"id": 2
				},
				{
					"name": "普通用户",
					"id": 3
				}
			],
			"@version": "1",
			"id": 1,
			"username": "admin"
		}
	}
]

"types" and "roles" is subobjects of user,it's duplicated.

This is the configuration file when start logstash: I use logstash-filter-aggregate plugin, but I don't know the right way to use is correct.

input {
  jdbc {
    jdbc_driver_library => "/Users/wangbin/src/test/logstash-es-mysql/docker/logstash/jar/mysql-connector-java-5.1.45.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/testes?useUnicode=true&characterEncoding=utf8&useSSL=false&autoReconnect=true&createDatabaseIfNotExist=true"
    jdbc_user => "root"
    jdbc_password => "root"
    jdbc_default_timezone => "Asia/Shanghai"
    jdbc_paging_enabled => true
    jdbc_page_size => 100000
    jdbc_fetch_size => 10000
    connection_retry_attempts => 3
    connection_retry_attempts_wait_time => 1
    jdbc_pool_timeout => 5
    lowercase_column_names => true
    record_last_run => true
    schedule => "* * * * *"
    use_column_value => true
    tracking_column => "id"
    statement_filepath => "/Users/wangbin/src/test/logstash-es-mysql/docker/logstash/config/user.sql"
  }
}
filter {
  aggregate {
    task_id => "%{id}"
    code => "
      map['id'] = event.get('id')
      map['username'] = event.get('username')
      map['password'] = event.get('password')
      map['roles'] ||= []
      map['roles'] << {
        'id' => event.get('role_id'),
        'name' => event.get('role_name')
      }
      map['types'] ||= []
      map['types'] << {
        'id' => event.get('type_id'),
        'name' => event.get('type_name')
      }
      event.cancel()
    "
    push_previous_map_as_event => true
  }
}

output {
  elasticsearch {
    hosts => ["localhost:9200"]
    index => "user"
    document_type => "user"
    document_id => "%{id}"
  }
}

And the user.sql follows:

SELECT
  u.id,
  u.password,
  u.username,
  u.type,
  r.id   AS role_id,
  r.name AS role_name,
  t.id   AS type_id,
  t.name AS type_name
FROM user u
  LEFT JOIN user_role ru ON ru.user_id = u.id
  LEFT JOIN role r ON r.id = ru.role_id
  LEFT JOIN user_type ut ON ut.user_id = u.id
  LEFT JOIN type t ON t.id = ut.type_id

(system) #2

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.