BulkProcessor pest practices

Hey guys,

I'm using the BulkProcessor to index documents in elasticsearch. Its
definitely made my indexing throughput greater than it was before.

Anyway, I was wondering if there were some best practices around exception
handling with the bulk processor. For example it would be good to schedule
retries in certain scenarios.

At the moment all I'm doing is logging. I was wondering if someone could
point me to a resource with an example of handling a
NodeNotConnectedException and doing a retry. I don’t know how to access the
contents of the bulkProcessor from within the afterBulk method in the
Listener.

        public void beforeBulk(long executionId, BulkRequest 

bulkRequest) {
}

        @Override
        public void afterBulk(long executionId, BulkRequest 

bulkRequest, BulkResponse bulkResponse) {
if (bulkResponse.hasFailures()) {
Log.error("We have failures");
for (BulkItemResponse bulkItemResponse :
bulkResponse.getItems()) {
if (bulkItemResponse.isFailed()) {
Log.error(bulkItemResponse.getId() + " failed
with message: " + bulkItemResponse.getFailureMessage());
}
}
}
}

        @Override
        public void afterBulk(long executionId, BulkRequest 

bulkRequest, Throwable t) {
Log.error("An exception occurred while indexing", t);

         // How do I add this back to the list of requests?

        }

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/7e7d476d-6a8d-4a82-bbd0-e331a08d1bb4%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

If you make your bulk final, I think this could work:

private final BulkProcessor bulk;

CrmApp() {
Client esClient = new TransportClient(
ImmutableSettings.builder().put("cluster.name", "devoxx")
).addTransportAddress(
new InetSocketTransportAddress("127.0.0.1", 9300)
);

bulk = BulkProcessor.builder(esClient, new BulkProcessor.Listener() {
    @Override
    public void beforeBulk(long executionId, BulkRequest request) {
        logger.debug("[{}] going to execute {} requests", executionId, request.numberOfActions());
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
        logger.debug("[{}] ok", executionId);
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
        logger.warn("We have a problem", failure);
        bulk.add(request);
    }
})
        .setBulkActions(pageSize)
        .setFlushInterval(TimeValue.timeValueSeconds(5))
        .build();

}

--
David Pilato - Developer | Evangelist
elastic.co
@dadoonet https://twitter.com/dadoonet | @elasticsearchfr https://twitter.com/elasticsearchfr | @scrutmydocs https://twitter.com/scrutmydocs

Le 24 avr. 2015 à 10:59, mzrth_7810 afrazmamoon@gmail.com a écrit :

Hey guys,

I'm using the BulkProcessor to index documents in elasticsearch. Its definitely made my indexing throughput greater than it was before.

Anyway, I was wondering if there were some best practices around exception handling with the bulk processor. For example it would be good to schedule retries in certain scenarios.

At the moment all I'm doing is logging. I was wondering if someone could point me to a resource with an example of handling a NodeNotConnectedException and doing a retry. I don’t know how to access the contents of the bulkProcessor from within the afterBulk method in the Listener.

        public void beforeBulk(long executionId, BulkRequest bulkRequest) {
        }

        @Override
        public void afterBulk(long executionId, BulkRequest bulkRequest, BulkResponse bulkResponse) {
            if (bulkResponse.hasFailures()) {
                Log.error("We have failures");
                for (BulkItemResponse bulkItemResponse : bulkResponse.getItems()) {
                    if (bulkItemResponse.isFailed()) {
                        Log.error(bulkItemResponse.getId() + " failed with message: " + bulkItemResponse.getFailureMessage());
                    }
                }
            }
        }

        @Override
        public void afterBulk(long executionId, BulkRequest bulkRequest, Throwable t) {
            Log.error("An exception occurred while indexing", t);

         // How do I add this back to the list of requests?

        }

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com mailto:elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/7e7d476d-6a8d-4a82-bbd0-e331a08d1bb4%40googlegroups.com https://groups.google.com/d/msgid/elasticsearch/7e7d476d-6a8d-4a82-bbd0-e331a08d1bb4%40googlegroups.com?utm_medium=email&utm_source=footer.
For more options, visit https://groups.google.com/d/optout https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/4ABE0924-A01E-421F-A5A6-D1A34A0EC236%40pilato.fr.
For more options, visit https://groups.google.com/d/optout.