Missing document in Elasticsearch when using BulkProcessor

I'm using a [java] kafka-producer to push data to kafka-topic x and a [java] high level consumer/bulkProcessor to read from topic x and index data to elasticsearch. The producer pushes 10 docs each time. When I start my java code for bulkProcessor for the first time after running producer, I see only 9 records being pushed to ES, all with "_version": 1. The 10th record is not in ES.

beforeBulk() and afterBulk() methods show the following results.

Going to execute new bulk composed of 10 actions
java.lang.StringIndexOutOfBoundsException: String index out of range: 0

The exception in afterbulk() is only for the first record. This moment onward, if I remove the elasticsearch index and use the producer, I see 10 records consistently. I have no idea why this is happening. Any help is appreciated.

Note: ES version 2.2.0
Kafka: 0.9.0.0

[relevant code]

public Consumer(KafkaStream a_stream, int a_threadNumber, String esHost, String esCluster, int bulkSize, String  topic) {
    
/*Create transport client*/
BulkProcessor bulkProcessor;
       
this.bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
    public void beforeBulk(long executionId, BulkRequest request) {
            System.out.format("Going to execute new bulk composed of %d actions\n", request.numberOfActions());
    }

    public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
            System.out.format("Executed bulk composed of %d actions\n", response.getItems().length);
            int i = 0;
            while(i < response.getItems().length) {
                System.out.println(response.getItems()[i].getFailureMessage());
                i++;
            }
    }

    public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
            System.out.format("Error executing bulk", failure);
    }
    }).setBulkActions(bulkSize) 
            .setBulkSize(new ByteSizeValue(200, ByteSizeUnit.MB)) 
            .setFlushInterval(TimeValue.timeValueSeconds(1))
            .build();
}

public void run() {        
    ConsumerIterator<byte[], byte[]> it = m_stream.iterator();   
    while (it.hasNext()) {
        byte[] x = it.next().message();
        try {             
            bulkProcessor.add(new IndexRequest(index, type, id.toString()).source(modifyMsg(x).toString()));
        } 
        catch (Exception e) {
            logger.warn("bulkProcessor failed: " + m_threadNumber + e.getMessage());
        }                   
    }
    logger.info("Shutting down Thread: " + m_threadNumber);
}

Docs going to ES are of the following form:

{"index":"temp1","type":"temp2","id":"0","event":"we're doomed"}
{"index":"temp1","type":"temp2","id":"1","event":"we're doomed"}
{"index":"temp1","type":"temp2","id":"2","event":"we're doomed"}
...
{"index":"temp1","type":"temp2","id":"9","event":"we're doomed"}

Even if I use auto-generated _id in ES, the same issue persists.

[EDIT]

If I add the following line in my run() method the problem is gone.

public void run() {
    ...
    bulkProcessor.add(new IndexRequest(""));  //Added this line
    while (it.hasNext()) {
        ...         
    }
    ...
}

My bad. In the line bulkProcessor.add(new IndexRequest(index, type, id.toString()).source(modifyMsg(x).toString())); the method modifyMsg() was initializing index, type and id, which was set to empty string in the constructor. That's why my first index request was failing as it had invalid index name.

1 Like

Thanks for sharing your resolution :slight_smile: