hi es experts:
I use the es high/low leve JAVA API to update some docs, what we found is the speed is very low, and it takes about 3-4 mins for a 'Bulk Request' size about 10K.
then we I did is separate the Big Bulk Size to about 1k per execution. then the speed is getting very fast, about 200ms per execution.
however, the speed is getting lower and lower[200ms->>40sec] as the time goes on since there are a huge amount of data waiting there.
and we have checked the kibana monitor of elasticsearch nodes and did not found any abnormal things during the full update period.
our index settings:
"number_of_shards": "5",
"number_of_replicas": "0",
"refresh_interval": "-1"
so what I want to know is what will impact the es update performance?
the bulk methods:
private
boolean updateUnitData() {
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.timeout(TimeValue.timeValueSeconds(600));
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
int docCnt = 0;
for (DataSet dataSet : this.getLotInfo().getDataSets().values()) {
for (Doc doc : dataSet.getUnitData()) {
/**
* generate the bulk update request
*/
if (doc.getRank() != null && doc.getRank().equals(doc.getMotherLotInsertion())) {
continue;
}
jsonMap.clear();
jsonMap.put(FieldType.Rank, doc.getMotherLotInsertion());
bulkRequest.add(new UpdateRequest(
doc.getIndex(),
"doc",
doc.getId()).doc(jsonMap));
docCnt++;
if (docCnt == Config.bulkSize) {
docCnt = 0;
boolean result = executeBulkTask(bulkRequest, Config.bulkSize);
if (!result) {
return false;
}
bulkRequest = null;
bulkRequest = new BulkRequest();
bulkRequest.timeout(TimeValue.timeValueSeconds(600));
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
}
}
}
// case 1: docCnt = 900 case2: 1900
if (docCnt != 0) {
boolean result = executeBulkTask(bulkRequest, docCnt);
if (!result) {
return false;
}
}
return true;
}
private
boolean executeBulkTask(BulkRequest bulkRequest, int size) {
long startTime = System.currentTimeMillis();
try {
BulkResponse bulkResponse = productionClient.bulk(bulkRequest, RequestOptions.DEFAULT);
if (bulkResponse.hasFailures()) {
System.out.printf("\t%s: failed to update the unit %s\n", LocalDateTime.now().toString(), FieldType.Rank);
for (BulkItemResponse bulkItemResponse : bulkResponse) {
if (bulkItemResponse.isFailed()) {
BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
System.out.printf("\t%s:%s\n", LocalDateTime.now().toString(), failure.getMessage());
}
}
this.logUpsertEvent2ES(FieldType.ES_BULK_FAILURE, this.dataFormat.getLotIndexName(), this.getLotInfo().getDoc_Id(), FieldType.ES_EVENT_LOT);
return false;
}
else {
System.out.printf("\tUpdate Unit Cnt = %d, Total Time is: %s\n", size, (System.currentTimeMillis() - startTime));
this.logUpsertEvent2ES(FieldType.ES_BULK_PASS, this.dataFormat.getLotIndexName(), this.getLotInfo().getDoc_Id(), FieldType.ES_EVENT_LOT);
return true;
}
}
catch (IOException ex) {
this.logUpsertEvent2ES(FieldType.ES_BULK_Error, this.dataFormat.getLotIndexName(), this.getLotInfo().getDoc_Id(), FieldType.ES_EVENT_LOT);
ex.printStackTrace();
return false;
}
}