BulkProcessor Indexing Performance

Hi,

We are scheduling ES BulkUpsert using the ES 5.6 RESTHighLevel client to index documents every hour. Approximate total bulk size is 36k every hour.
This is the configuration i am using:CONCURRENT_REQUESTS = 30,BULK_ACTION = 300,BULK_SIZE = 5,FLUSH_INTERVAL = 10,TIMEOUT = 5

Initially when i index the documents it is very slow:
Executed bulk composed of {} actions : test.id6.cs115 - test.id6.hostLevel-hourly
Bulk execution completed [5].
Took (ms): 631
Count: 300
Total Executed bulk composed of {} actions : 300
Executed bulk composed of {} actions : test.id6.ap2 - test.id6.hostLevel-hourly
Bulk execution completed [3].
Took (ms): 60199
Count: 300
Total Executed bulk composed of {} actions : 600
Executed bulk composed of {} actions : test.id6.cs15 - test.id6.hostLevel-hourly
Bulk execution completed [6].
Took (ms): 20
Count: 300
Total Executed bulk composed of {} actions : 900
Executed bulk composed of {} actions : test.id6.ap0 - test.id6.hostLevel-hourly
Bulk execution completed [2].
Took (ms): 29839
Count: 300
Total Executed bulk composed of {} actions : 1200
Executed bulk composed of {} actions : test.id6.ap5 - test.id6.hostLevel-hourly
Bulk execution completed [4].
Took (ms): 29845
Count: 300

Also here is my code:

public class ESIndexer {

private RestHighLevelClient client;
private RestClient lowClient;
private int numOfReqs = 0;
private BulkProcessor bulkProcessor;

private final int PORT = 9200;
private int concurrentReq = 30;
private int bulkAction = 200;
private int bulkSize = 5;
private int flushInterval = 5;
private int timeout = 5;

public void setBulkInsertParams(String fileName) throws IOException {
	Properties prop = new Properties();
	ESUtils utils = new ESUtils();
	utils.readFile(fileName);
	prop.load(utils.getInputStream());
	concurrentReq = Integer.valueOf(prop.get("CONCURRENT_REQUESTS").toString());
	bulkAction = Integer.valueOf(prop.get("BULK_ACTION").toString());
	bulkSize = Integer.valueOf(prop.get("BULK_SIZE").toString());
	flushInterval = Integer.valueOf(prop.get("FLUSH_INTERVAL").toString());
	timeout = Integer.valueOf(prop.get("TIMEOUT").toString());
	System.out.println("concurrentReq is"+concurrentReq);
	System.out.println("bulkAction is"+bulkAction);
	System.out.println("bulkSize is"+bulkSize);
}

/**
 * Elastic Search Client Initializer. This creates a ssl connection to the
 * given ES url.
 * 
 * @param url
 * @param userName
 * @param password
 * @throws IOException
 * @throws KeyStoreException
 * @throws NoSuchAlgorithmException
 * @throws CertificateException
 * @throws KeyManagementException
 */
public void initializeClient(String url, String userName, String password) throws IOException, KeyStoreException,
		NoSuchAlgorithmException, CertificateException, KeyManagementException {

	final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
	credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(userName, password));
	Path certPath = Paths.get("src/main/resources/cert/cacerts");
	KeyStore truststore = KeyStore.getInstance("jks");
	try (InputStream is = Files.newInputStream(certPath)) {
		truststore.load(is, "changeit".toCharArray());
	}

	SSLContextBuilder sslBuilder = SSLContexts.custom().loadTrustMaterial(truststore, null);
	final SSLContext sslContext = sslBuilder.build();

	lowClient = RestClient.builder(new HttpHost(url, PORT, "https"))
			.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
				@Override
				public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
					return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
							.setSSLContext(sslContext);
				}
			}).setRequestConfigCallback(new RequestConfigCallback() {
				@Override
				public Builder customizeRequestConfig(Builder builder) {
					return builder.setConnectionRequestTimeout(-1).setSocketTimeout(-1);
				}
			}).build();

	client = new RestHighLevelClient(lowClient);
}

private BulkProcessor bulkProcessListener(ThreadPool threadPool) {

	BulkProcessor.Listener listener = new BulkProcessor.Listener() {
		long start;

		@Override
		public void beforeBulk(long executionId, BulkRequest request) {
			start = System.currentTimeMillis();
		}

		@Override
		public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {				
			System.out.println("Total Executed bulk composed of {} actions : " + numOfReqs);
			System.out.println("Executed bulk composed of {} actions : " + request.requests().get(0).index() + " - "
					+ request.requests().get(0).type());

			System.out.println("Bulk execution completed [" + executionId + "].\n" + "Took (ms): "
					+ (System.currentTimeMillis() - start) + "\n" + "Count: " + response.getItems().length);
			numOfReqs += response.getItems().length;
		}

		@Override
		public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
			failure.printStackTrace();
			System.out.println("Error executing bulk : " + failure.getMessage());
		}
	};

	BulkProcessor bulkProcessor = new BulkProcessor.Builder(client::bulkAsync, listener, threadPool)
			.setBulkActions(bulkAction).setConcurrentRequests(concurrentReq)
			.setBulkSize(new ByteSizeValue(bulkSize, ByteSizeUnit.MB))
			.setFlushInterval(TimeValue.timeValueSeconds(flushInterval)).build();

	return bulkProcessor;
}

public void executeBulkRequests(List<UpdateRequest> updateRequestList) throws Exception {

	ThreadPool threadPool = null;
	try {
		threadPool = new ThreadPool(
				Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "high-level-client").build());

		bulkProcessor = bulkProcessListener(threadPool);
		for (UpdateRequest updateRequest : updateRequestList) {
			bulkProcessor.add(updateRequest);
		}
	} finally {
		bulkProcessor.awaitClose(timeout, TimeUnit.MINUTES);
	}
}

}

In the subsequent runs when i create more documents in the same index it becomes very fast. Also my id for document is {dk}{hk}{16bit representation of a auto generated id}
Can anyone suggest how to increase my indexing performance and other best practices for bulk processeing?

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.