Hi,
We are scheduling ES BulkUpsert using the ES 5.6 RESTHighLevel client to index documents every hour. Approximate total bulk size is 36k every hour.
This is the configuration i am using:CONCURRENT_REQUESTS = 30,BULK_ACTION = 300,BULK_SIZE = 5,FLUSH_INTERVAL = 10,TIMEOUT = 5
Initially when i index the documents it is very slow:
Executed bulk composed of {} actions : test.id6.cs115 - test.id6.hostLevel-hourly
Bulk execution completed [5].
Took (ms): 631
Count: 300
Total Executed bulk composed of {} actions : 300
Executed bulk composed of {} actions : test.id6.ap2 - test.id6.hostLevel-hourly
Bulk execution completed [3].
Took (ms): 60199
Count: 300
Total Executed bulk composed of {} actions : 600
Executed bulk composed of {} actions : test.id6.cs15 - test.id6.hostLevel-hourly
Bulk execution completed [6].
Took (ms): 20
Count: 300
Total Executed bulk composed of {} actions : 900
Executed bulk composed of {} actions : test.id6.ap0 - test.id6.hostLevel-hourly
Bulk execution completed [2].
Took (ms): 29839
Count: 300
Total Executed bulk composed of {} actions : 1200
Executed bulk composed of {} actions : test.id6.ap5 - test.id6.hostLevel-hourly
Bulk execution completed [4].
Took (ms): 29845
Count: 300
Also here is my code:
public class ESIndexer {
private RestHighLevelClient client;
private RestClient lowClient;
private int numOfReqs = 0;
private BulkProcessor bulkProcessor;
private final int PORT = 9200;
private int concurrentReq = 30;
private int bulkAction = 200;
private int bulkSize = 5;
private int flushInterval = 5;
private int timeout = 5;
public void setBulkInsertParams(String fileName) throws IOException {
Properties prop = new Properties();
ESUtils utils = new ESUtils();
utils.readFile(fileName);
prop.load(utils.getInputStream());
concurrentReq = Integer.valueOf(prop.get("CONCURRENT_REQUESTS").toString());
bulkAction = Integer.valueOf(prop.get("BULK_ACTION").toString());
bulkSize = Integer.valueOf(prop.get("BULK_SIZE").toString());
flushInterval = Integer.valueOf(prop.get("FLUSH_INTERVAL").toString());
timeout = Integer.valueOf(prop.get("TIMEOUT").toString());
System.out.println("concurrentReq is"+concurrentReq);
System.out.println("bulkAction is"+bulkAction);
System.out.println("bulkSize is"+bulkSize);
}
/**
* Elastic Search Client Initializer. This creates a ssl connection to the
* given ES url.
*
* @param url
* @param userName
* @param password
* @throws IOException
* @throws KeyStoreException
* @throws NoSuchAlgorithmException
* @throws CertificateException
* @throws KeyManagementException
*/
public void initializeClient(String url, String userName, String password) throws IOException, KeyStoreException,
NoSuchAlgorithmException, CertificateException, KeyManagementException {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(userName, password));
Path certPath = Paths.get("src/main/resources/cert/cacerts");
KeyStore truststore = KeyStore.getInstance("jks");
try (InputStream is = Files.newInputStream(certPath)) {
truststore.load(is, "changeit".toCharArray());
}
SSLContextBuilder sslBuilder = SSLContexts.custom().loadTrustMaterial(truststore, null);
final SSLContext sslContext = sslBuilder.build();
lowClient = RestClient.builder(new HttpHost(url, PORT, "https"))
.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
.setSSLContext(sslContext);
}
}).setRequestConfigCallback(new RequestConfigCallback() {
@Override
public Builder customizeRequestConfig(Builder builder) {
return builder.setConnectionRequestTimeout(-1).setSocketTimeout(-1);
}
}).build();
client = new RestHighLevelClient(lowClient);
}
private BulkProcessor bulkProcessListener(ThreadPool threadPool) {
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
long start;
@Override
public void beforeBulk(long executionId, BulkRequest request) {
start = System.currentTimeMillis();
}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
System.out.println("Total Executed bulk composed of {} actions : " + numOfReqs);
System.out.println("Executed bulk composed of {} actions : " + request.requests().get(0).index() + " - "
+ request.requests().get(0).type());
System.out.println("Bulk execution completed [" + executionId + "].\n" + "Took (ms): "
+ (System.currentTimeMillis() - start) + "\n" + "Count: " + response.getItems().length);
numOfReqs += response.getItems().length;
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
failure.printStackTrace();
System.out.println("Error executing bulk : " + failure.getMessage());
}
};
BulkProcessor bulkProcessor = new BulkProcessor.Builder(client::bulkAsync, listener, threadPool)
.setBulkActions(bulkAction).setConcurrentRequests(concurrentReq)
.setBulkSize(new ByteSizeValue(bulkSize, ByteSizeUnit.MB))
.setFlushInterval(TimeValue.timeValueSeconds(flushInterval)).build();
return bulkProcessor;
}
public void executeBulkRequests(List<UpdateRequest> updateRequestList) throws Exception {
ThreadPool threadPool = null;
try {
threadPool = new ThreadPool(
Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "high-level-client").build());
bulkProcessor = bulkProcessListener(threadPool);
for (UpdateRequest updateRequest : updateRequestList) {
bulkProcessor.add(updateRequest);
}
} finally {
bulkProcessor.awaitClose(timeout, TimeUnit.MINUTES);
}
}
}
In the subsequent runs when i create more documents in the same index it becomes very fast. Also my id for document is {dk}{hk}{16bit representation of a auto generated id}
Can anyone suggest how to increase my indexing performance and other best practices for bulk processeing?