I have a specific requirement where I need to index a large amount of data in Elasticsearch. Every document has 3 fields. My requirement is that I need to index up to 20 Million such documents in 5 mins. I am hitting 10 million with the following settings:
Bulk processor with default settings:
Assigned 4GB heap to Elasticsearch via VMOptions: which assigns around 400Mb to buffer index
Replica shards disabled
refresh interval disabled
Single thread which is populating the requests in the Bulk processor
Default merging
Swapping enabled
Single node containing 5 shards running on a single server.
Server has 8 cores and 32 GB RAM
I observed that running two nodes on same server and feeding data to 2 nodes concurrently using 2 clients degraded the indexing speed.
What else can be done to hit a 20 million mark or to improve indexing further.
Determine what is limiting performance. Look at CPU usage and disk I/O. Also try different bulk sizes and slowly increase the number of threads writing to Elasticsearch if no obvious bottleneck is found.
With the following Bulk processor config the CPU usage is around 20-30% and Disk IO is around 70-85%.
BulkProcessor bulkProcessor = BulkProcessor.builder(client, createListener())
.setBulkActions(20000)
.setBulkSize(new ByteSizeValue(10, ByteSizeUnit.MB))
.setConcurrentRequests(2)
.setBackoffPolicy(
BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
.build();
What I am confused about is the concurrentRequest here. Is it the number of threads that are internally spawned and given the request to execute concurrently or is it the number of threads that collect the docs and send them to the ES server.
If I create multiple threads to push data to ES how will it help? As I have a single node listening to 9300 port and data on a port is read sequentially, how using multiple thread helps?
Say if I create 2 nodes on two ports and try pushing the data using 2 clients on different ports should it help me in achieving greater performance?
Note, a single ES node does not run on a single thread, it uses thread pools to make use of all available CPU resources, and can handle a huge amount of parallel connections.
Bulk processing in synchronous mode (single threaded) works like a single pipeline: the client collects a list of JSON docs, submit them, waits for getting them processed on the cluster, then collects next list of JSON docs etc.
The disadvantage is that synchronous mode (single thread) has to wait for a response before next list of JSON docs is sent, and this wastes a lot of time.
Bulk processing in asynchronous mode works like a multithreaded pipeline: the first thread collects JSON docs, submits them, but then, the client does not wait for a cluster response, instead it switches to next thread for collecting more JSON docs, and so on - until the concurrency limit is reached. The client receives responses from the cluster as they come in, continuing with a free thread.
The advantage of asynchronous mode is that there is better utilization of available system resources and much higher throughput.
Increase
to
and you will notice a difference.
Note, you do not need two ports or more to get maximum performance. A client already connects to all cluster nodes in parallel. Opening more connections is not making anything faster but a waste of network resources.
Using more clients on more machines will help to increase throughput even more. It is up to you to organize how to distribute the JSON docs between the clients.
Thanks a lot for explaining the concurrency model of Bulk processor.
As you said a client already makes parallel connection to all nodes in the cluster ,I am curious how does it do that.
Because when we create a Transport client we specify the socket info(IPAddress + port) of a node.
I was under the impression that we connect to a node in a cluster which as per it's role serves the client request.
Also will I get benefit if I use a multi threaded model to push data where each thread shares a BulkProcessor to push data. It will be kind of similar to the threading model of the BulkProcessor. Isn't it?
Apache, Apache Lucene, Apache Hadoop, Hadoop, HDFS and the yellow elephant
logo are trademarks of the
Apache Software Foundation
in the United States and/or other countries.