Indexing Performance, Threads + Bulk Size


(davrob) #1

Hi,

I've been indexing about 90,000 documents in about 8 minutes. Is that
good performance for elastic search? I think other people have got
much better. I've based my indexing on the wikipedia river. Is it
possible to do the following:

  1. Is bulk updates the way to go for performance or is iterating
    though individual saves as fast?
  2. Can I increase the bulk size from 100 to 1000 or 10,000?
  3. Will several concurrent thread adding bulk updates to the same
    master node help performance?

thanks, my current update code is below.

David.

  public Boolean bulkSave(JsonBean jsonBean, BulkRequestBuilder

currentRequest) {

            IndexRequest indexOperation =

Requests.indexRequest(indexName)
.type(jsonBean.getIndexType())
.id("" + jsonBean.getId())
.create(false)
.source(jsonBean.getJson());

			currentRequest.add(indexOperation);
            Boolean createNew =

processBulkIfNeeded(currentRequest, processedAccount++);
return createNew;

    }

    private Boolean processBulkIfNeeded(BulkRequestBuilder

currentRequest, int processedAccounts) {
Boolean createNewBulkReq = false;
if (currentRequest.numberOfActions() >= bulkSize) {
// execute the bulk operation
int currentOnGoingBulks =
onGoingBulks.incrementAndGet();
LOG.info("Ongoing Bulks = " + currentOnGoingBulks + "
processed Accounts = " + processedAccounts );
if (currentOnGoingBulks > dropThreshold) {
// TODO, just wait here!, we can slow down the
parsing
onGoingBulks.decrementAndGet();
LOG.error("dropping bulk, " + onGoingBulks + "
crossed threshold " + dropThreshold + " processed Accounts = " +
processedAccounts);
} else {
try {
final int bulkNo = onGoingBulks.get();
LOG.info("Executing Bulk Request " + bulkNo );
currentRequest.execute(new
ActionListener() {
@Override public void
onResponse(BulkResponse bulkResponse) {
onGoingBulks.decrementAndGet();
LOG.info("Bulk [" + bulkNo + "]
Executed");
}

                        @Override public void onFailure(Throwable

e) {
LOG.error("Error in Bulk [" + bulkNo +
"]", e);
}
});
} catch (Exception e) {
LOG.error("failed to process bulk", e);
}
}
// once we have executed a bulk request, create a new
one for adding
// a fresh set of bulk updates, deletes, additions
etc.
createNewBulkReq = true;

        }
		return createNewBulkReq;
    }

(Shay Banon) #2

I don't read code on mails (formatting is not kept), if you want, you can gist it and I can have a look.

Indexing speed depends a lot on the document you index, and how big it is. There is a big difference between indexing tweets to indexing attachments.

You can certainly use more than one thread doing the bulk indexing, it will improve the indexing performance. As to the size of the batch, ES will need to represent the batch request in memory, so you need to make sure not to concurrently send too much data over the network or have the JVM run out of memory.

-shay.banon
On Monday, January 17, 2011 at 11:34 AM, dalesrob wrote:

Hi,

I've been indexing about 90,000 documents in about 8 minutes. Is that
good performance for elastic search? I think other people have got
much better. I've based my indexing on the wikipedia river. Is it
possible to do the following:

  1. Is bulk updates the way to go for performance or is iterating
    though individual saves as fast?
  2. Can I increase the bulk size from 100 to 1000 or 10,000?
  3. Will several concurrent thread adding bulk updates to the same
    master node help performance?

thanks, my current update code is below.

David.

public Boolean bulkSave(JsonBean jsonBean, BulkRequestBuilder
currentRequest) {

IndexRequest indexOperation =
Requests.indexRequest(indexName)
.type(jsonBean.getIndexType())
.id("" + jsonBean.getId())
.create(false)
.source(jsonBean.getJson());

currentRequest.add(indexOperation);
Boolean createNew =
processBulkIfNeeded(currentRequest, processedAccount++);
return createNew;

}

private Boolean processBulkIfNeeded(BulkRequestBuilder
currentRequest, int processedAccounts) {
Boolean createNewBulkReq = false;
if (currentRequest.numberOfActions() >= bulkSize) {
// execute the bulk operation
int currentOnGoingBulks =
onGoingBulks.incrementAndGet();
LOG.info("Ongoing Bulks = " + currentOnGoingBulks + "
processed Accounts = " + processedAccounts );
if (currentOnGoingBulks > dropThreshold) {
// TODO, just wait here!, we can slow down the
parsing
onGoingBulks.decrementAndGet();
LOG.error("dropping bulk, " + onGoingBulks + "
crossed threshold " + dropThreshold + " processed Accounts = " +
processedAccounts);
} else {
try {
final int bulkNo = onGoingBulks.get();
LOG.info("Executing Bulk Request " + bulkNo );
currentRequest.execute(new
ActionListener() {
@Override public void
onResponse(BulkResponse bulkResponse) {
onGoingBulks.decrementAndGet();
LOG.info("Bulk [" + bulkNo + "]
Executed");
}

@Override public void onFailure(Throwable
e) {
LOG.error("Error in Bulk [" + bulkNo +
"]", e);
}
});
} catch (Exception e) {
LOG.error("failed to process bulk", e);
}
}
// once we have executed a bulk request, create a new
one for adding
// a fresh set of bulk updates, deletes, additions
etc.
createNewBulkReq = true;

}
return createNewBulkReq;
}


(system) #3