2 or more nested array without duplicates

I am building my ES indexes from data from our MySQL databases, using a jdbc plugin.

One of my columns has multiple many-to-many parameters, that I need to import into the index (as ES does not manage many-to-many well). I do this with a JOIN call:

input {
  jdbc {
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "${OPTA_CONNECTION_STRING:jdbc:mysql://localhost:3306/oz?serverTimezone=UTC}"
    jdbc_user => "${DB_USER:root}"
    jdbc_password => "${DB_PASSWORD:root}"
    jdbc_paging_enabled => true
    tracking_column => "updatedAt"
    use_column_value => true
    tracking_column_type => "numeric"
    schedule => "*/59 * * * * *"
    statement => "SELECT testA.id as id, testA.dataA1 as dataA1, testA.dataA2 as dataA2, testA.updatedAt as updatedAt,
    testB.dataB1 as dataB1, testB.dataB2 as dataB2,
    testC.dataC1 as dataC1, testC.dataC2 as dataC2
    FROM testA

    LEFT JOIN aToB aToB
    ON aToB.idA = testA.id
    LEFT JOIN testB testB
    ON testB.id = aToB.idB

    LEFT JOIN aToC aToC
    ON aToC.idA = testA.id
    LEFT JOIN testC testC
    ON testC.id = aToC.idC

    WHERE UNIX_TIMESTAMP(testA.updatedAt) > :sql_last_value AND testA.updatedAt < NOW()"
  }
}
filter{
  aggregate {
    task_id => "%{id}"
    code => "
        p event
        map['id'] = event.get('id')
        map['dataA1'] = event.get('dataA1')
        map['dataA2'] = event.get('dataA2')
        map['testB'] ||= []
        testBEntry = {
            'dataB1' => event.get('dataB1'),
            'dataB2' => event.get('dataB2')
        }
        p testBEntry
        if ! map['testB'].include?(testBEntry)
          map['testB'] << testBEntry
        else
          puts 'Duplicate'
        end
        map['testC'] ||= []
        testCEntry = {
            'dataC1' => event.get('dataC1'),
            'dataC2' => event.get('dataC2')
        }
        p testCEntry
        if ! map['testC'].include?(testCEntry)
          map['testC'] << testCEntry
        else
          puts 'Duplicate'
        end
        event.cancel()"
    push_previous_map_as_event => true
    timeout => 30
  }
}
output {
    elasticsearch {
      hosts => ["${ES_HOST:http://localhost:9200}"]
      user => "${ES_USER:elastic}"
      password => "${ES_PASSWORD:changeme}"
      index => "test"
      document_id => "test_%{id}"
  }
}

So what I tried to do at first was simply add the values, but since a join would duplicate the columns for every piece of information, there was a lot of duplication in the resulting index.

In this example, I want my data A to contain an array of 1 dataB, and 2 dataC. Problem is, since the join produces two lines (as there are two dataC), I will have a duplicated dataB.

The code above is my current attempt to fix this: I want to add the entry only if it is not in the map. Problem is, I think the maps are in common somehow because after the first insert, the next ones are marked as duplicates, and some are not event written.

What is the correct way to import multiple many-to-many entries without duplicates and without missing some entries ?

MySQL tables ready to import:

# Dump of table aToB
# ------------------------------------------------------------

DROP TABLE IF EXISTS `aToB`;

CREATE TABLE `aToB` (
  `id` int(10) unsigned NOT NULL AUTO_INCREMENT,
  `idA` int(11) DEFAULT NULL,
  `idB` int(11) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;

LOCK TABLES `aToB` WRITE;
/*!40000 ALTER TABLE `aToB` DISABLE KEYS */;

INSERT INTO `aToB` (`id`, `idA`, `idB`)
VALUES
	(1,1,1),
	(3,2,2);

/*!40000 ALTER TABLE `aToB` ENABLE KEYS */;
UNLOCK TABLES;


# Dump of table aToC
# ------------------------------------------------------------

DROP TABLE IF EXISTS `aToC`;

CREATE TABLE `aToC` (
  `id` int(10) unsigned NOT NULL AUTO_INCREMENT,
  `idA` int(11) DEFAULT NULL,
  `idC` int(11) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;

LOCK TABLES `aToC` WRITE;
/*!40000 ALTER TABLE `aToC` DISABLE KEYS */;

INSERT INTO `aToC` (`id`, `idA`, `idC`)
VALUES
	(1,1,1),
	(2,1,2),
	(3,2,1),
	(4,2,2);

/*!40000 ALTER TABLE `aToC` ENABLE KEYS */;
UNLOCK TABLES;


# Dump of table testA
# ------------------------------------------------------------

DROP TABLE IF EXISTS `testA`;

CREATE TABLE `testA` (
  `id` int(6) unsigned NOT NULL AUTO_INCREMENT,
  `dataA1` int(6) DEFAULT NULL,
  `dataA2` int(6) DEFAULT NULL,
  `updatedAt` datetime DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;

LOCK TABLES `testA` WRITE;
/*!40000 ALTER TABLE `testA` DISABLE KEYS */;

INSERT INTO `testA` (`id`, `dataA1`, `dataA2`, `updatedAt`)
VALUES
	(1,1,2,'2020-12-10 14:57:53'),
	(2,3,4,'2020-12-10 14:57:53');

/*!40000 ALTER TABLE `testA` ENABLE KEYS */;
UNLOCK TABLES;


# Dump of table testB
# ------------------------------------------------------------

DROP TABLE IF EXISTS `testB`;

CREATE TABLE `testB` (
  `id` int(6) unsigned NOT NULL AUTO_INCREMENT,
  `dataB1` int(6) DEFAULT NULL,
  `dataB2` int(6) DEFAULT NULL,
  `updatedAt` datetime DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;

LOCK TABLES `testB` WRITE;
/*!40000 ALTER TABLE `testB` DISABLE KEYS */;

INSERT INTO `testB` (`id`, `dataB1`, `dataB2`, `updatedAt`)
VALUES
	(1,5,6,'2020-12-10 14:57:53'),
	(2,7,8,'2020-12-10 14:57:53'),
	(3,666,6677,'2020-12-10 14:57:53');

/*!40000 ALTER TABLE `testB` ENABLE KEYS */;
UNLOCK TABLES;


# Dump of table testC
# ------------------------------------------------------------

DROP TABLE IF EXISTS `testC`;

CREATE TABLE `testC` (
  `id` int(6) unsigned NOT NULL AUTO_INCREMENT,
  `dataC1` int(6) DEFAULT NULL,
  `dataC2` int(6) DEFAULT NULL,
  `updatedAt` datetime DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;

LOCK TABLES `testC` WRITE;
/*!40000 ALTER TABLE `testC` DISABLE KEYS */;

INSERT INTO `testC` (`id`, `dataC1`, `dataC2`, `updatedAt`)
VALUES
	(1,9,10,'2020-12-10 14:57:53'),
	(2,11,12,'2020-12-10 14:57:53'),
	(3,888,999,'2020-12-10 14:57:53');

/*!40000 ALTER TABLE `testC` ENABLE KEYS */;
UNLOCK TABLES;

Here's the join in MySQL

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.