BulkProcessor Indexing Performance


(Sahibdeep Singh) #1

Hi,

We are scheduling ES BulkUpsert using the ES 5.6 RESTHighLevel client to index documents every hour. Approximate total bulk size is 36k every hour.
This is the configuration i am using:CONCURRENT_REQUESTS = 30,BULK_ACTION = 300,BULK_SIZE = 5,FLUSH_INTERVAL = 10,TIMEOUT = 5

Initially when i index the documents it is very slow:
Executed bulk composed of {} actions : test.id6.cs115 - test.id6.hostLevel-hourly
Bulk execution completed [5].
Took (ms): 631
Count: 300
Total Executed bulk composed of {} actions : 300
Executed bulk composed of {} actions : test.id6.ap2 - test.id6.hostLevel-hourly
Bulk execution completed [3].
Took (ms): 60199
Count: 300
Total Executed bulk composed of {} actions : 600
Executed bulk composed of {} actions : test.id6.cs15 - test.id6.hostLevel-hourly
Bulk execution completed [6].
Took (ms): 20
Count: 300
Total Executed bulk composed of {} actions : 900
Executed bulk composed of {} actions : test.id6.ap0 - test.id6.hostLevel-hourly
Bulk execution completed [2].
Took (ms): 29839
Count: 300
Total Executed bulk composed of {} actions : 1200
Executed bulk composed of {} actions : test.id6.ap5 - test.id6.hostLevel-hourly
Bulk execution completed [4].
Took (ms): 29845
Count: 300

Also here is my code:

public class ESIndexer {

private RestHighLevelClient client;
private RestClient lowClient;
private int numOfReqs = 0;
private BulkProcessor bulkProcessor;

private final int PORT = 9200;
private int concurrentReq = 30;
private int bulkAction = 200;
private int bulkSize = 5;
private int flushInterval = 5;
private int timeout = 5;

public void setBulkInsertParams(String fileName) throws IOException {
	Properties prop = new Properties();
	ESUtils utils = new ESUtils();
	utils.readFile(fileName);
	prop.load(utils.getInputStream());
	concurrentReq = Integer.valueOf(prop.get("CONCURRENT_REQUESTS").toString());
	bulkAction = Integer.valueOf(prop.get("BULK_ACTION").toString());
	bulkSize = Integer.valueOf(prop.get("BULK_SIZE").toString());
	flushInterval = Integer.valueOf(prop.get("FLUSH_INTERVAL").toString());
	timeout = Integer.valueOf(prop.get("TIMEOUT").toString());
	System.out.println("concurrentReq is"+concurrentReq);
	System.out.println("bulkAction is"+bulkAction);
	System.out.println("bulkSize is"+bulkSize);
}

/**
 * Elastic Search Client Initializer. This creates a ssl connection to the
 * given ES url.
 * 
 * @param url
 * @param userName
 * @param password
 * @throws IOException
 * @throws KeyStoreException
 * @throws NoSuchAlgorithmException
 * @throws CertificateException
 * @throws KeyManagementException
 */
public void initializeClient(String url, String userName, String password) throws IOException, KeyStoreException,
		NoSuchAlgorithmException, CertificateException, KeyManagementException {

	final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
	credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(userName, password));
	Path certPath = Paths.get("src/main/resources/cert/cacerts");
	KeyStore truststore = KeyStore.getInstance("jks");
	try (InputStream is = Files.newInputStream(certPath)) {
		truststore.load(is, "changeit".toCharArray());
	}

	SSLContextBuilder sslBuilder = SSLContexts.custom().loadTrustMaterial(truststore, null);
	final SSLContext sslContext = sslBuilder.build();

	lowClient = RestClient.builder(new HttpHost(url, PORT, "https"))
			.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
				@Override
				public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
					return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
							.setSSLContext(sslContext);
				}
			}).setRequestConfigCallback(new RequestConfigCallback() {
				@Override
				public Builder customizeRequestConfig(Builder builder) {
					return builder.setConnectionRequestTimeout(-1).setSocketTimeout(-1);
				}
			}).build();

	client = new RestHighLevelClient(lowClient);
}

private BulkProcessor bulkProcessListener(ThreadPool threadPool) {

	BulkProcessor.Listener listener = new BulkProcessor.Listener() {
		long start;

		@Override
		public void beforeBulk(long executionId, BulkRequest request) {
			start = System.currentTimeMillis();
		}

		@Override
		public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {				
			System.out.println("Total Executed bulk composed of {} actions : " + numOfReqs);
			System.out.println("Executed bulk composed of {} actions : " + request.requests().get(0).index() + " - "
					+ request.requests().get(0).type());

			System.out.println("Bulk execution completed [" + executionId + "].\n" + "Took (ms): "
					+ (System.currentTimeMillis() - start) + "\n" + "Count: " + response.getItems().length);
			numOfReqs += response.getItems().length;
		}

		@Override
		public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
			failure.printStackTrace();
			System.out.println("Error executing bulk : " + failure.getMessage());
		}
	};

	BulkProcessor bulkProcessor = new BulkProcessor.Builder(client::bulkAsync, listener, threadPool)
			.setBulkActions(bulkAction).setConcurrentRequests(concurrentReq)
			.setBulkSize(new ByteSizeValue(bulkSize, ByteSizeUnit.MB))
			.setFlushInterval(TimeValue.timeValueSeconds(flushInterval)).build();

	return bulkProcessor;
}

public void executeBulkRequests(List<UpdateRequest> updateRequestList) throws Exception {

	ThreadPool threadPool = null;
	try {
		threadPool = new ThreadPool(
				Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "high-level-client").build());

		bulkProcessor = bulkProcessListener(threadPool);
		for (UpdateRequest updateRequest : updateRequestList) {
			bulkProcessor.add(updateRequest);
		}
	} finally {
		bulkProcessor.awaitClose(timeout, TimeUnit.MINUTES);
	}
}

}

In the subsequent runs when i create more documents in the same index it becomes very fast. Also my id for document is {dk}{hk}{16bit representation of a auto generated id}
Can anyone suggest how to increase my indexing performance and other best practices for bulk processeing?


(system) #2

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.