I am upgrading from Elasticsearch 1.7.1 to 2.1. I had a test case in 1.7.1 and am testing the same on 2.1.
Here are the steps
- create object with random values and corresponding JSONs are 3K Bytes.
- Index 200 such records (could be any number) to index "test"
- Drop index "test"
- Index 200K Records to index "test"
In 1.7.1 step 4 used to take ~40 seconds.
In 2.1.0 step 4 is taking ~600 seconds
I reran the same test on 2.0.1 it is taking ~40 seconds.
If step 3 is not executed Step 4 takes only ~40 seconds in 2.1.0.
I saw lot of these warnings when the performance was slow:
high disk watermark [90%] exceeded on [ZkeRJaTUR4iu3h8DmIiV2w][Thing]... free: 11gb[4.7%], shards will be relocated away from this node.
Also, disk and cpu utilization were significantly higher.
These warnings and high utilization were not seen when step 3 was not executed 2.1.0.
Java Code:
insertBulkData(1000);
delIndex();
insertBulkData(200000);
insertBulkData(int size) throws Exception {
RandomValueGenerator randomGenerator = new RandomValueGenerator();
BulkProcessor bulkProcessor = BulkProcessor
.builder(client, getBulkProcessListener())
.setBulkActions(BULK_ACTION_COUNT)
.setBulkSize(
new ByteSizeValue(BULK_ACTION_SIZE_MB, ByteSizeUnit.MB))
.setFlushInterval(
TimeValue.timeValueSeconds(FLUSH_INTERVAL_SECS))
.setConcurrentRequests(CUNCURRENT_REQUESTS).build();
count = 0;
for (int i = 0; i < size; i++) {
TestObject obj = randomGenerator.generateOneObject();
String jsonString = (new Gson()).toJson(obj);
bulkProcessor.add(new IndexRequest(index, type, obj.getKey())
.source(jsonString));
}
try {
bulkProcessor.flush();
bulkProcessor.awaitClose(BULK_ACTION_CLOSE_WAIT_MINS,
TimeUnit.MINUTES);
} catch (Exception ex) {
throw new Exception("Unable to insert data into", ex);
}
client.admin().indices().flush(new FlushRequest(index)).actionGet();
}
private Listener getBulkProcessListener() {
Listener listner = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
}
@Override
public void afterBulk(long executionId, BulkRequest request,
BulkResponse response) {
count += request.numberOfActions();
System.out.println("BulkTest: so far : " + count + " time "
+ response.getTook());
if (response.hasFailures()) {
System.out.println(response.buildFailureMessage());
}
}
@Override
public void afterBulk(long executionId, BulkRequest request,
Throwable failure) {
System.out.println(failure.getMessage());
}
};
return listner;
}
private void delIndex() throws InterruptedException {
System.out.println("BulkTest: Deleting Index");
DeleteIndexResponse res = client.admin().indices().prepareDelete(index).get();
Thread.sleep(10000);
client.admin().indices().flush(new FlushRequest());
client.admin().indices().prepareRefresh().execute().actionGet();
}