[Java] BulkProcessor- custom data in request-response items

Hi there,
I have working code to index documents using BulkProcessor. I need to ensure a kind of correctness of bulk indexing - I need to report failed documents. I can access response items in afterBulk callback (if there were some failed documents) or request items in afterBulk callback (if there was a whole bulk execution failure), however I can access only few properties (or geters) of request/response items. I can access id, index, type etc.
I have got situation where id field is not enough information to classify failed document.
Is there a way to add custom property to request, which will be accessible within response ?

Here is snippet what i want to achieve:

BulkProcessor bulkProcessor = BulkProcessor.builder(elasticsearchClient, new BulkProcessor.Listener() {
    @Override
    public void beforeBulk(long executionId, BulkRequest request) {

    }

    @Override
    public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
        if (response.hasFailures()) {
            for (int i = 0; i < response.getItems().length; i++)
            {
                BulkItemResponse item = response.getItems()[i];

                // bulk failed item
                if(item.isFailed())
                {
                    //item.getId(), item.index(), item.getFailureMessage() ...
                    String something = item.getSomething(); // <- this is what i want to achieve
                }
            }
        }
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request, Throwable failure) {

        // whole bulk failed, no request was indexed 
        for (int i = 0; i < request.requests().size(); i++)
        {
            //item.getId(), item.index() ... 
            String something = request.requests().get(i).getSomething(); // <- this is what i want to achieve
        }

    }
})
. ... settings 
.build();

Is it even possible ? Elasticsearch Java Client is pretty complex to me, so I need help. Stack Overflow is maybe better place to ask, but I was very satisfied with my previous experience with this forum.
Thank you

The documents are ordered the same in the response as in the request, so you should be able to identify the exact documents that failed by looking at the corresponding index in the request.

The order information is useless for me. I wrote that document id is not enough information to classify failed document, but another (order) id will not help me. I know it looks a little strange, but I think it could be really useful.
Another way is to access failed document source field - is it possible ?

I am feeding bulk processor from a number of threads. Within thread I have got the information I want to add to request/response as I wrote above. When bulk processor executes (lets say the number of requests or size or time flush condition was fulfilled) and some documents fail I am loosing the information which helps me with classification.

Thank you

The Bulk Java API supports adding a payload to each request:

The Bulk Processor supports it as well. See:

So I suppose you can add your own needed payload with that method and retrieve it with request.payloads()

Doc says that payloads are coming back in the same order as the bulk item requests:

    /**
     * The list of optional payloads associated with requests in the same order as the requests. Note, elements within
     * it might be null if no payload has been provided.
     * <p>
     * Note, if no payloads have been provided, this method will return null (as to conserve memory overhead).
     */
    @Nullable
    public List<Object> payloads() {
        return this.payloads;
    }

I hope this helps.

1 Like

Thank you.
I found another solution by extending IndexRequest class and adding custom property.
I think my problem is solved.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.