Seeking Guidance on Implementing Retry Mechanism and Handling Delayed PubSub Messages in Bulk Document Operations

Dear all,

I hope this message finds you well.

I am currently immersed in the implementation of bulk operations for handling documents. The process involves receiving PubSub messages and dynamically generating corresponding Elasticsearch documents. Each document is associated with an external version, and upon a successful store operation, I invoke the ack() method for the relevant PubSub message. Additionally, I handle cases where a version_conflict_engine_exception error occurs.

However, I have a couple of queries that I would appreciate your insights on:

  1. Implementing Retry Mechanism:
    I am looking for guidance on implementing a retry mechanism for scenarios where issues arise, other than the version_conflict_engine_exception error. What would be the best approach to handle and retry such cases?

  2. Delayed PubSub Messages:
    In a scenario where there are five PubSub messages, and the arrival of subsequent messages is delayed, what happens to these initial five documents? Will they be flushed automatically after a 3-second interval, as configured in the implementation?

Here is an overview of my current implementation:

class DemoService {

private BulkIngester<BulkContext> ingester;

private void intBulkIngester() {

ingester =
        BulkIngester.of(
            b ->
                b.client(esClient)
                    .maxOperations(10)
                    .flushInterval(3, TimeUnit.SECONDS)
                    .listener(new BulkIngesterListener()));
}

  public void consume(
      final PubsSub message,
      final PubSubAckReply pubSubAckReply) {
      
      final BulkContext context =
        BulkContext.builder().version(message.version()).pubSubAckReply(pubSubAckReply).build();

var document = message.getDocument();
    bulkIngesterService.addDocument(
        op ->
            op.index(
                idx ->
                    idx.index("DEMO_INDEX")
                        .document(document)
                        .version(message.version())
                        .id(document.getId())
                        .versionType(VersionType.External)),
        context);
      
      }

}

public class BulkIngesterListener<BulkContext>
    implements BulkListener<BulkContext> {

  @Override
  public void beforeBulk(
      final long executionId,
      final BulkRequest request,
      final List<BulkContext> bulkContexts) {
    
  }

  @Override
  public void afterBulk(
      final long executionId,
      final BulkRequest request,
      final List<BulkContext> bulkContexts,
      final BulkResponse response) {
   

    for (int i = 0; i < bulkContexts.size(); i++) {
      final BulkResponseItem item = response.items().get(i);

      final BulkContext context = bulkContexts.get(i);

      final ErrorCause error = item.error();
      if (error != null) {

        if (error.type() != null && error.type().equals("version_conflict_engine_exception")) {
          context.pubSubAckReply().ack();

        } else {
          context.pubSubAckReply().nack();
        }
      } else {
        context.pubSubAckReply().ack();
      }
    }
  }

  @Override
  public void afterBulk(
      final long executionId,
      final BulkRequest request,
      final List<cars.ship.syncer.services.BulkContext> bulkContexts,
      final Throwable failure) {
    
  }
}

@Builder(toBuilder = true)
public record BulkContext(long version, PubSubAckReply pubSubAckReply) {}

Your guidance and suggestions on these points would be immensely valuable. Thank you for your time and assistance.

Hi @Christian_Dahlqvist @dadoonet do you have an idea about the above topic?

If I did I might have answered. Note that this is a community forum, so it is considered rude to ping people not already involved in a thread. You do not know who has experience in what area of the stack. Please be patient as it can take a few days to get a response, in particular for questions around areas where there are a limited number of people with relevant experience.

I acknowledge that, but I made a mistake with the category and am unable to relocate the topic to the correct one. I assumed you would give me some advice because you and I had discussed some of the aspect in another thread of mine. I apologize

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.