Dear all,
I hope this message finds you well.
I am currently immersed in the implementation of bulk operations for handling documents. The process involves receiving PubSub messages and dynamically generating corresponding Elasticsearch documents. Each document is associated with an external version, and upon a successful store operation, I invoke the ack() method for the relevant PubSub message. Additionally, I handle cases where a version_conflict_engine_exception
error occurs.
However, I have a couple of queries that I would appreciate your insights on:
-
Implementing Retry Mechanism:
I am looking for guidance on implementing a retry mechanism for scenarios where issues arise, other than theversion_conflict_engine_exception
error. What would be the best approach to handle and retry such cases? -
Delayed PubSub Messages:
In a scenario where there are five PubSub messages, and the arrival of subsequent messages is delayed, what happens to these initial five documents? Will they be flushed automatically after a 3-second interval, as configured in the implementation?
Here is an overview of my current implementation:
class DemoService {
private BulkIngester<BulkContext> ingester;
private void intBulkIngester() {
ingester =
BulkIngester.of(
b ->
b.client(esClient)
.maxOperations(10)
.flushInterval(3, TimeUnit.SECONDS)
.listener(new BulkIngesterListener()));
}
public void consume(
final PubsSub message,
final PubSubAckReply pubSubAckReply) {
final BulkContext context =
BulkContext.builder().version(message.version()).pubSubAckReply(pubSubAckReply).build();
var document = message.getDocument();
bulkIngesterService.addDocument(
op ->
op.index(
idx ->
idx.index("DEMO_INDEX")
.document(document)
.version(message.version())
.id(document.getId())
.versionType(VersionType.External)),
context);
}
}
public class BulkIngesterListener<BulkContext>
implements BulkListener<BulkContext> {
@Override
public void beforeBulk(
final long executionId,
final BulkRequest request,
final List<BulkContext> bulkContexts) {
}
@Override
public void afterBulk(
final long executionId,
final BulkRequest request,
final List<BulkContext> bulkContexts,
final BulkResponse response) {
for (int i = 0; i < bulkContexts.size(); i++) {
final BulkResponseItem item = response.items().get(i);
final BulkContext context = bulkContexts.get(i);
final ErrorCause error = item.error();
if (error != null) {
if (error.type() != null && error.type().equals("version_conflict_engine_exception")) {
context.pubSubAckReply().ack();
} else {
context.pubSubAckReply().nack();
}
} else {
context.pubSubAckReply().ack();
}
}
}
@Override
public void afterBulk(
final long executionId,
final BulkRequest request,
final List<cars.ship.syncer.services.BulkContext> bulkContexts,
final Throwable failure) {
}
}
@Builder(toBuilder = true)
public record BulkContext(long version, PubSubAckReply pubSubAckReply) {}
Your guidance and suggestions on these points would be immensely valuable. Thank you for your time and assistance.