I am building my ES indexes from data from our MySQL databases, using a jdbc plugin.
One of my columns has multiple many-to-many parameters, that I need to import into the index (as ES does not manage many-to-many well). I do this with a JOIN call:
input {
jdbc {
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "${OPTA_CONNECTION_STRING:jdbc:mysql://localhost:3306/oz?serverTimezone=UTC}"
jdbc_user => "${DB_USER:root}"
jdbc_password => "${DB_PASSWORD:root}"
jdbc_paging_enabled => true
tracking_column => "updatedAt"
use_column_value => true
tracking_column_type => "numeric"
schedule => "*/59 * * * * *"
statement => "SELECT testA.id as id, testA.dataA1 as dataA1, testA.dataA2 as dataA2, testA.updatedAt as updatedAt,
testB.dataB1 as dataB1, testB.dataB2 as dataB2,
testC.dataC1 as dataC1, testC.dataC2 as dataC2
FROM testA
LEFT JOIN aToB aToB
ON aToB.idA = testA.id
LEFT JOIN testB testB
ON testB.id = aToB.idB
LEFT JOIN aToC aToC
ON aToC.idA = testA.id
LEFT JOIN testC testC
ON testC.id = aToC.idC
WHERE UNIX_TIMESTAMP(testA.updatedAt) > :sql_last_value AND testA.updatedAt < NOW()"
}
}
filter{
aggregate {
task_id => "%{id}"
code => "
p event
map['id'] = event.get('id')
map['dataA1'] = event.get('dataA1')
map['dataA2'] = event.get('dataA2')
map['testB'] ||= []
testBEntry = {
'dataB1' => event.get('dataB1'),
'dataB2' => event.get('dataB2')
}
p testBEntry
if ! map['testB'].include?(testBEntry)
map['testB'] << testBEntry
else
puts 'Duplicate'
end
map['testC'] ||= []
testCEntry = {
'dataC1' => event.get('dataC1'),
'dataC2' => event.get('dataC2')
}
p testCEntry
if ! map['testC'].include?(testCEntry)
map['testC'] << testCEntry
else
puts 'Duplicate'
end
event.cancel()"
push_previous_map_as_event => true
timeout => 30
}
}
output {
elasticsearch {
hosts => ["${ES_HOST:http://localhost:9200}"]
user => "${ES_USER:elastic}"
password => "${ES_PASSWORD:changeme}"
index => "test"
document_id => "test_%{id}"
}
}
So what I tried to do at first was simply add the values, but since a join would duplicate the columns for every piece of information, there was a lot of duplication in the resulting index.
In this example, I want my data A to contain an array of 1 dataB, and 2 dataC. Problem is, since the join produces two lines (as there are two dataC), I will have a duplicated dataB.
The code above is my current attempt to fix this: I want to add the entry only if it is not in the map. Problem is, I think the maps are in common somehow because after the first insert, the next ones are marked as duplicates, and some are not event written.
What is the correct way to import multiple many-to-many entries without duplicates and without missing some entries ?
MySQL tables ready to import:
# Dump of table aToB
# ------------------------------------------------------------
DROP TABLE IF EXISTS `aToB`;
CREATE TABLE `aToB` (
`id` int(10) unsigned NOT NULL AUTO_INCREMENT,
`idA` int(11) DEFAULT NULL,
`idB` int(11) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
LOCK TABLES `aToB` WRITE;
/*!40000 ALTER TABLE `aToB` DISABLE KEYS */;
INSERT INTO `aToB` (`id`, `idA`, `idB`)
VALUES
(1,1,1),
(3,2,2);
/*!40000 ALTER TABLE `aToB` ENABLE KEYS */;
UNLOCK TABLES;
# Dump of table aToC
# ------------------------------------------------------------
DROP TABLE IF EXISTS `aToC`;
CREATE TABLE `aToC` (
`id` int(10) unsigned NOT NULL AUTO_INCREMENT,
`idA` int(11) DEFAULT NULL,
`idC` int(11) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
LOCK TABLES `aToC` WRITE;
/*!40000 ALTER TABLE `aToC` DISABLE KEYS */;
INSERT INTO `aToC` (`id`, `idA`, `idC`)
VALUES
(1,1,1),
(2,1,2),
(3,2,1),
(4,2,2);
/*!40000 ALTER TABLE `aToC` ENABLE KEYS */;
UNLOCK TABLES;
# Dump of table testA
# ------------------------------------------------------------
DROP TABLE IF EXISTS `testA`;
CREATE TABLE `testA` (
`id` int(6) unsigned NOT NULL AUTO_INCREMENT,
`dataA1` int(6) DEFAULT NULL,
`dataA2` int(6) DEFAULT NULL,
`updatedAt` datetime DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
LOCK TABLES `testA` WRITE;
/*!40000 ALTER TABLE `testA` DISABLE KEYS */;
INSERT INTO `testA` (`id`, `dataA1`, `dataA2`, `updatedAt`)
VALUES
(1,1,2,'2020-12-10 14:57:53'),
(2,3,4,'2020-12-10 14:57:53');
/*!40000 ALTER TABLE `testA` ENABLE KEYS */;
UNLOCK TABLES;
# Dump of table testB
# ------------------------------------------------------------
DROP TABLE IF EXISTS `testB`;
CREATE TABLE `testB` (
`id` int(6) unsigned NOT NULL AUTO_INCREMENT,
`dataB1` int(6) DEFAULT NULL,
`dataB2` int(6) DEFAULT NULL,
`updatedAt` datetime DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
LOCK TABLES `testB` WRITE;
/*!40000 ALTER TABLE `testB` DISABLE KEYS */;
INSERT INTO `testB` (`id`, `dataB1`, `dataB2`, `updatedAt`)
VALUES
(1,5,6,'2020-12-10 14:57:53'),
(2,7,8,'2020-12-10 14:57:53'),
(3,666,6677,'2020-12-10 14:57:53');
/*!40000 ALTER TABLE `testB` ENABLE KEYS */;
UNLOCK TABLES;
# Dump of table testC
# ------------------------------------------------------------
DROP TABLE IF EXISTS `testC`;
CREATE TABLE `testC` (
`id` int(6) unsigned NOT NULL AUTO_INCREMENT,
`dataC1` int(6) DEFAULT NULL,
`dataC2` int(6) DEFAULT NULL,
`updatedAt` datetime DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
LOCK TABLES `testC` WRITE;
/*!40000 ALTER TABLE `testC` DISABLE KEYS */;
INSERT INTO `testC` (`id`, `dataC1`, `dataC2`, `updatedAt`)
VALUES
(1,9,10,'2020-12-10 14:57:53'),
(2,11,12,'2020-12-10 14:57:53'),
(3,888,999,'2020-12-10 14:57:53');
/*!40000 ALTER TABLE `testC` ENABLE KEYS */;
UNLOCK TABLES;