I'm using a [java] kafka-producer
to push data to kafka-topic x and a [java] high level consumer/bulkProcessor
to read from topic x and index data to elasticsearch. The producer pushes 10 docs each time. When I start my java code for bulkProcessor for the first time after running producer, I see only 9 records being pushed to ES, all with "_version": 1
. The 10th record is not in ES.
beforeBulk()
and afterBulk()
methods show the following results.
Going to execute new bulk composed of 10 actions
java.lang.StringIndexOutOfBoundsException: String index out of range: 0
The exception in afterbulk()
is only for the first record. This moment onward, if I remove the elasticsearch index and use the producer, I see 10 records consistently. I have no idea why this is happening. Any help is appreciated.
Note: ES version 2.2.0
Kafka: 0.9.0.0
[relevant code]
public Consumer(KafkaStream a_stream, int a_threadNumber, String esHost, String esCluster, int bulkSize, String topic) {
/*Create transport client*/
BulkProcessor bulkProcessor;
this.bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
public void beforeBulk(long executionId, BulkRequest request) {
System.out.format("Going to execute new bulk composed of %d actions\n", request.numberOfActions());
}
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
System.out.format("Executed bulk composed of %d actions\n", response.getItems().length);
int i = 0;
while(i < response.getItems().length) {
System.out.println(response.getItems()[i].getFailureMessage());
i++;
}
}
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
System.out.format("Error executing bulk", failure);
}
}).setBulkActions(bulkSize)
.setBulkSize(new ByteSizeValue(200, ByteSizeUnit.MB))
.setFlushInterval(TimeValue.timeValueSeconds(1))
.build();
}
public void run() {
ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
while (it.hasNext()) {
byte[] x = it.next().message();
try {
bulkProcessor.add(new IndexRequest(index, type, id.toString()).source(modifyMsg(x).toString()));
}
catch (Exception e) {
logger.warn("bulkProcessor failed: " + m_threadNumber + e.getMessage());
}
}
logger.info("Shutting down Thread: " + m_threadNumber);
}
Docs going to ES are of the following form:
{"index":"temp1","type":"temp2","id":"0","event":"we're doomed"}
{"index":"temp1","type":"temp2","id":"1","event":"we're doomed"}
{"index":"temp1","type":"temp2","id":"2","event":"we're doomed"}
...
{"index":"temp1","type":"temp2","id":"9","event":"we're doomed"}
Even if I use auto-generated _id in ES, the same issue persists.
[EDIT]
If I add the following line in my run() method the problem is gone.
public void run() {
...
bulkProcessor.add(new IndexRequest("")); //Added this line
while (it.hasNext()) {
...
}
...
}