BulkInsert et paramétrage de elasticsearch.yml

Bonjour,

Pour un bulkInsert de 1000 requêtes
...
.setBulkActions(1000).setConcurrentRequests(1).build();

Faut il positionner le "threadpool.bulk.queue_size" à 1000 pour éviter les erreurs du type

EsRejectedExecutionException[rejected execution (queue capacity 100) on org.elasticsearch.action.support.replication.TransportShardReplicationOperationAction$PrimaryPhase$1@3b031126]

?

Sans ça beaucoup de documents ne sont pas insérés.

A quoi sert le threadpool.bulk.size dans ce cas ? Est-ce la taille initiale du pool ?

Merci d'avance pour vos réponses.

Blured.

threadpool.bulk.size sert à indique combien de requêtes de type BULK elasticsearch peut traiter en parallèle. Quelque soit le nombre d'opération à l'intérieur de chaque requête de type bulk.

Je te déconseille de modifier cette valeur.

Ok merci de ta réponse rapide. Concernant le queue_size je suis dans le vrai ?

Désolé. Ma réponse est incorrecte. Je voulais parler de threadpool.bulk.queue_size.

D'une manière générale, ne pas toucher à threadpool.bulk.*.

Mais si je n'y touche pas et que je tente d'insérer 50000 documents via ce type d'algo, j’obtiens des myriades d'erreurs de type :

EsRejectedExecutionException[rejected execution (queue capacity 100) on org.elasticsearch.action.support.replication.TransportShardReplicationOperationAction$PrimaryPhase$1@3b031126]
// initialisation du bulkProcessor
bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
			
@Override
public void beforeBulk(long executionId, BulkRequest request) {
}

@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {				
  if (response.hasFailures()) {
    for (BulkItemResponse item : response.getItems()) {
      LOGGER.error("Processing to index \"{}\" with type \"{}\" failed for entity id {} with message {}", item.getIndex(), item.getType(), item.getId(), item.getFailureMessage());
      LOGGER.error(response.buildFailureMessage());
   }
}

@Override
public void afterBulk(long executionId, BulkRequest request, Throwable afterBulk)     
{
  LOGGER.error("Error while index in ELS failure:"+afterBulk.getMessage(), afterBulk);
}
}).setBulkActions(5000).setConcurrentRequests(1).build();
// Exécution de N fois ce code (autant de fois que j'aurais d'enregistrement à insérer)
bulkProcessor.add(new IndexRequest(table, type).source(jsonBuilder()
 .startObject()
 .field(CODE,(intCode != null ? intCode.toString() : strCode))
 .field(HASH,  newHash)
 .field(TEXT, text)
.endObject()));

Y aurait il quelque chose de déconnant dans ce code ?

A+,
Blured.

Non. Comme ça, le code me semble correct.

D'autres informations sur la configuration de tes index ?
As-tu mis énormément de replicas par exemple ?

J'ai laissé tout par défaut sinon. Pour l'instant 1 serveur Elastic avec 12 indexes et dans chaque index une dizaines de types. Donc à priori il y a un seul replica.

En fait il y a 29 indexes
soit 145 shards

Peut être que ça joue ?

Veux-tu des traces particulières, genre _search_shards ou autres ?

si tu indexes dans un seul index à la fois, ça ne devrait pas poser de pb mais si effectivement tu surcharges ta machine en indexant partout, tu risques d'avoir ce genre d'erreur.

Si tu n'as pas besoin de 5 shards par index, commence par réduire ça en 1 shard.
30 shards sur une machine me semble plus raisonnable.

En effet, j'ai une série de data que j'indexe dans plusieurs index (un index par langue). Je vais essayer avec 1 shard par index...

C'était ça, trop de shard donc saturation du pool.

Merci !