Alternative bulk indexing implementations?


#1

I've worked with elasticsearch a little over a year and have been increasingly frustrated with bulk indexing performance. I basically expect to have a continuous load of 250,000 2-4k events per second.

Repeated analysis has shown elasticsearch to be the bottleneck. The default BulkProcessor implementation seems to create a great many threads, and there seems to be contention amongst these threads during my bulk requests.

Several previous attempts to investigate this problem in the past have not yielded helpful results.

I'm wondering if an alternate approach to bulk indexing (fewer dedicated threads that do not come or go but which are continuously indexing) would yield better results? Has anyone thought about this? Started work on it? What would be the best way to do this? Could this be done by extending and overriding existing ES classes? Or would I need to fork the existing ES code base and replace components?


(Jörg Prante) #2

I think you have identified TransportClient issues, not BulkProcessor. Do you use TransportClient?

TransportClient works reliable in "calm" environments but if ES cluster enters faulty conditions (because of your documents or tight resources), TransportClient tries in best effort manner to perform reconnects/failovers to keep the indexing going. That might cost a lot of sockets/threads.

Also, if you use concurrency, make sure your client heap size can handle all the thread activities. If not, you can not keep up with the bulk indexing speed.


#3

JÖrg, thank you for sharing your expertise. I remain interested in how I might extend/replace BulkProcessor but I had not considered issues with a fundamental component like the client, and thank you for pointing that out as an area for further investigation.

Yes, I am, in fact, using TransportClient. I've had previously experimented with using NodeClient a bit. Is there another option I'm missing here?

I do not remember it being significantly better. What do you mean by a calm environment? My environment only includes my ES instance and my ingest process. Are there any indicators of the TransportClient issues you are alluding to? (high fd count? I'm obviously seeing high thread counts). And is there a better alternative and/or could this be the right place for me to focus further investigation/development?

Currently I use concurrency, 24 requests. I've experimented with the bulkprocessor parameters here with limited success.

Thanks again for your thoughts.


(Ivan Brusic) #4

Are you sure the problems are on the client side? Perhaps the clients get
held up waiting from an ack on the server side, potentially due to numerous
issues such as merging. The default in 2.x is to no longer due async calls.

Ivan


(Jörg Prante) #5

Have you monitored the cluster nodes? Are there any issues? Maybe in the logs?


#6

On the cluster itself, fairly frequent throttling of indexing is a major feature. Due to nginx configuration, setup of Marvel is a bit painful, so I do most of my monitoring with curl and stats/_cat APIs.


(Jörg Prante) #7

Throttling of indexing is enabled? That is not the best setting for bulk indexing performance.

I deduce the following: you "expect" to index 250k events per second with an average size of 3k, that is 750 MB per sec. Is that correct?

If so you need some hard thinking about "bottleneck". Do you have such network bandwith? Do you intend to use a single client? That will not work by far IMHO.

750 MB per sec should be broken down into pieces that nodes can handle - let's say 20 MB per sec per cluster node. This is pretty healthy but it depends on the resources of your hardware. Still you will require around 40 cluster nodes with 40 clients, each serving ~20 MB per sec.

Some hints:

  • calculate your requirements, it's not very clear to me in what bulk index volume dimensions your cluster must work
  • generally, while indexing, check server logs for garbage collections /OOMs, check server load (not in the logs), in order to find the limits your node capacity - without any node limits you can not size a cluster at all
  • if you measure the bulk indexing by bytes per second in your client logs, you can get a pretty useful number to make your scaling calculations

#8

No, the problem could certainly be that the client is just waiting for the cluster to catch up on requests. The client-side symptom is contention between threads and blocking. As a result, I'd want to have finer grain control over this behavior -- maybe fewer threads, maybe a larger local queuing scenario, to reduce the local cost of this latency.

I experimented with 2.x indexing performance and did not notice a significant difference. Is there a way to select to async calls in ES 1.7.x?


#9

This is a correct characterization. This exhibits my target performance with live data and the rate at which I'll be submitting sample events for my performance evaluation.

Daily indexes [* I may have to adjust sharding if I get to my target rate to resolve index/segment limitations]
5 shards, no replicas
Different types with about 10 common fields, doc_values are enabled to allow search
10-20 type specific fields
Average record size 3K

For a clearer picture, I'm currently using a one server cluster with ample memory and processor cores. The drawback is that there is slower storage. I may have the option of using tiered storage in the future, but that is not currently the case. So I don't feel that network bandwidth is not really a concern for me.

Currently, a single transport client is used for the bulk indexing (over loopback), and I've described its characteristics previously -- if there's something I can clarify there, let me know, but it currently sends up to 2000 messages per batch, with up to 24 concurrent requests in flight. I think this is about 6MB per request (2000 * 3k) with up to 150MB in flight at a time (6MB * 24). Are there disadvantages to using a single client?

...

I've experimented with additional local instances of ES to make better utilization of available memory on that box. I have used numactl to assign processor affinity (physcpubind) so each instance gets a reasonable subset of the 20 cores. The impact on indexing is currently being determined. However, for my purposes, if I can reach my target indexing rate through one or many instances, either approach is a win.

I am not sure what you mean by "bulk index volume dimensions" .. let me know how I can better answer this if I haven't already.

I have generally reduced most of the OOM/long gc issues I've encountered in normal use by using the 30500m heap size and reducing my indexing memory from 50% to 25% and using doc values. If I make an unkind query, I can still see this behavior, but it is a much rarer occurrence than when I started this project.

When you say this, I would think you would mean the number of requests posted ? So here I go and add a bulk request to my bulk processor, so I add 3K to my current batch size? Or end-to-end, not calculating anything until I get a "this many were successful" from bulkprocessor?

Thanks again for your thoughts.


(Jörg Prante) #10

Some more things to consider.

It seems you deliberately test on a single machine, partitioning a huge hardware into nodes.

While that is surely a possibility which is close at hand, it is not the preferred method how Elasticsearch is designed to scale. A single ES node can utilize all cores of a machine, and every shard it creates can allocate all resources. The main factor of limitation when scaling is the ability of the JVM to handle large memory and today's JVM architecture works best at ~8 GB heap. Larger heaps increase GC overhead. More than one JVM per machine comes with a slight penalty, because each JVM competes with the other, and Java threads must be mapped to the OS native thread model, also GC overhead adds up.

The preferred method is to add nodes when resources get low, that is, ES scales horizontally, not vertically.

That said, I suspect the "bottleneck" you have observed is also inherent to the choice of an architecture where the components are squeezed onto a single machine. Beside that it's really hard to deduce measurements for the capability of a single node.

My rules of thumb are:

  • small hardware, many servers: at least 3 machines in a cluster (1 or 2 do not form a true distributed system)
  • every data node must be busy indexing all the time, so there must be the same number of shards per machine, for each index. This ensures that bulk indexing can equally distribute the load over the nodes. ES shard allocation helps, by default it tries to balance the shard numbers
  • separation of concerns: client nodes should be remote, not situated on a machine with data node. This separates workload of data ingestion from index building (segment merging etc.)
  • no replica while bulk mode is ongoing, disabling refresh (and all the recommended bulk setup points)
  • if the client node(s) can not establish the required throughput, add next client node, or data node, and so on

The concurrency rate of 24 is quite high. It depends on the client node CPU core count and the cluster capability to answer. The larger the cluster node count, the faster the bulk responses. Maybe you have 24 cores idle at client node, then it should be ok. There is a tradeoff between large bulk requests and high bulk concurrency rate. The larger a bulk request, the more heap is used, and the longer a request/response takes.

Monitoring CPU (system load), disk I/O, and network traffic is essential and give a good idea what is going on between client nodes and data nodes.

By volume dimensions, I mean the calculation for matching the amount of data you want to index per time frame to the available hardware resources. It depends on the size of the data input, the interval/frequency of new data, but also the power of the cluster (throughput) and the reserve you might want to allocate for peak times. Without that calculation, you don't know the number of nodes required.

By far, disk I/O is the slowest component of an ES cluster, and first candidate of being a "bottleneck" when it comes to bulk indexing.


(system) #11