Slow bulk speed

I’m a bit stumped. If increasing heap size dues not help, and I don’t think it will, but worth trying, then ….

It doesn’t look like the IO.
You say it’s not the nesting
Not enough docs to be the _id
You have plenty of ooomph.

What’s feeding the data to ES?

The node stats is too big for me to eyeball without being lucky. Maybe others have tools to extract something.

That is a very good point. What are you using to index data into Elasticsearch? How many parallel bulk indexing threads are you using? Where does this data come from?

I have through the years seen many users complain about indexing speed just to find out that they are indexing data in a single thread or are limited by the speed they can extract the data from e.g. a relational database.

If it is possible that the ingest pipeline is the limiting factor you can do the following:

  • Write data into one or more large files on disk. Make sure the files contain enough data to result in at least a few GB of indexed data on disk.
  • Then create a Logstash pipeline that uses a file input to read this data from disk and write it to a test index in Elasticsearch using a custom id. For this test you can assign a UUID as document id.
  • Run this pipeline in Logstash. It will by default use multiple threads to ingest data. If this results in better performance that your custom indexing process it shows where the issue lies.

it's our tool which takes records from Kafka and fills bulks, of course, it's multithreaded - speed of Kafka reading as consumer is too fast to make bulks and execute it - mostly we wait ES bulk execution. In our configuration, we fill ES in 16 parallel tasks, increasing task count doesn't give any performance. And what about Logstash? It's another way to execute bulks? also I forget to show our elasticsearch.yml:

# ======================== Elasticsearch Configuration =========================
#
# NOTE: Elasticsearch comes with reasonable defaults for most settings.
#       Before you set out to tweak and tune the configuration, make sure you
#       understand what are you trying to accomplish and the consequences.
#
# The primary way of configuring a node is via this file. This template lists
# the most important settings you may want to configure for a production cluster.
#
# Please consult the documentation for further information on configuration options:
# https://www.elastic.co/guide/en/elasticsearch/reference/index.html
#
#----------------------------------- Security ---------------------------------
    xpack.security.enabled: true
    xpack.security.authc.api_key.enabled: true
    #
    # ---------------------------------- Cluster -----------------------------------
    #
    # Use a descriptive name for your cluster:
    #
    cluster.name: test-elasticsearch
    #
    # ------------------------------------ Node ------------------------------------
    #
    # Use a descriptive name for the node:
    #
    node.name: node-test-test
    #
    # Add custom attributes to the node:
    #
    #node.attr.rack: r1
    #
    # ----------------------------------- Paths ------------------------------------
    #
    # Path to directory where to store the data (separate multiple locations by comma):
    #
    path.data: /data/elasticsearch-8.14.3/data-elasticsearch
    #
    # Path to log files:
    #
    path.logs: /data/elasticsearch-8.14.3/logs
    #
    # ----------------------------------- Memory -----------------------------------
    #
    # Lock the memory on startup:
    #
    #bootstrap.memory_lock: true
    #
    # Make sure that the heap size is set to about half the memory available
    # on the system and that the owner of the process is allowed to use this
    # limit.
    #
    # Elasticsearch performs poorly when the system is swapping the memory.
    #
    # ---------------------------------- Network -----------------------------------
    #
    # By default Elasticsearch is only accessible on localhost. Set a different
    # address here to expose this node on the network:
    #
    network.host: 192.168.44.228
    #
    # By default Elasticsearch listens for HTTP traffic on the first free port it
    # finds starting at 9200. Set a specific HTTP port here:
    #
    http.port: 9200
    #
    # For more information, consult the network module documentation.
    #
    # --------------------------------- Discovery ----------------------------------
    #
    # Pass an initial list of hosts to perform discovery when this node is started:
    # The default list of hosts is ["127.0.0.1", "[::1]"]
    #
    discovery.seed_hosts: [ "192.168.44.228", "192.168.44.229", "192.168.44.230"]
    #
    # Bootstrap the cluster using an initial set of master-eligible nodes:
    #
    cluster.initial_master_nodes: [ "192.168.44.228", "192.168.44.229", "192.168.44.230"]
    #
    # For more information, consult the discovery and cluster formation module documentation.
    #
    # ---------------------------------- Various -----------------------------------
    #
    # Allow wildcard deletion of indices:
    #
    #action.destructive_requires_name: false
    #
    xpack.security.transport.ssl.enabled: true
    xpack.security.transport.ssl.verification_mode: certificate
    xpack.security.transport.ssl.keystore.path: /data/elasticsearch-8.14.3/config/elastic-stack-ca.p12
    xpack.security.transport.ssl.truststore.path: /data/elasticsearch-8.14.3/config/elastic-certificates.p12

I also tried to increase JVM up to -Xms31744m -Xmx31744m - unfortunately, it's make no changes in bulk performance

Sorry for long answer, but it's limited count for answers for new account

How does indexing throughput vary based on the number of threads used? Can you run it with 2, 4, 8 and 16 threads to compare?

Reading off Kafka is generally very fast so this surprises me. How many partitions does the topic you are reading from have? Have you verified that there actually are 16 bulk requests in flight to Elasticsearch at the same time? Are you acknowledging the Kafka read before you send the bulk request to Elasticsearch or after it has returned?

I see that Kafka gives me (as a consumer) 100+ hundreds per second, I don't do anything additional with Kafka. Each record from Kafka is added to queue, make processing (it's just converting Json - performance profiler shows that time spent is insignificant in the whole process, and than - I make bulks after exceeding limit of N (tried 1000, 5000, 25000 per bulk). I cannot do parallel bulks because we can have, for example, record with _id 100 in Kafka record with offset 1, and update document record with same _id in Kafka record number 1001 (different bulk). Our parallel execution is limited per topic (meaning - one Kafka topic = 16 thread processing Json conversion and filling one bulk assigned to topic, ES index). I think it's not a regular ES scenario, but maybe some ideas for speeding up this scenario? Logstash seems not to be a solution, it's mostly applicable for initial filling ES, as I understood

OK, so it sounds like you have 16 different topics, each with a single consumer, in order to maintain order. This then means that you can not go beyond 16 bulk threads in total. Is that correct?

Do all topics have ample supply of data so they are all working at full speed or would it be possible for some to idle part of the time?

Yes, that's right

Topics are independent, I have dedicated bulk object for each topic, so, filling bulk per topic isn't depend on arrival from separate Kafka topic.

I also tried yesterday to minimize document's Json removing arrays - it's make a sense - speed increased up to 8000-9000 RPS. But we cannot lose those data, unfortunately. I mean about this part:

  "occurOnReserve": [
    "QqTlgUartYwVBXl0SW0sPdEDPn6tjx5LoosltChzocVMs8nDY7pAsGyphFOCMSNL",
    "fIkmVl5lk7cXH1t55KlHtt5ERVzf0Y8EbxYbLpjV1t01X3g9X9D6p0uD0F26Ooeq",
    "wUVM6eQAwwqnzfbYrutGOBHEfH7WWvj6kk3y1tpTyyqXLOBwSfDNRN8u5Xhb1xx9"
  ],
  "occurOnCommit": [
    "AYKCXTCSJgvwTJv3jmNdphoPvPqYVGIXYeYgSQ4SNc4jeSSfJ95fYU1sOl0bYFW7",
    "K7LcbVKhLvJ1ck9lVAqtC8GEBSFs2GeRspVoE3KyFpmJe8q5n4ym4pYUIUrXInGa",
    "Jk79glpX4egTmad6dk5Foaa48NiwDQkUbgeXBnS7QqdnQ4JQeL4PHEqeaPbyptCa"
  ],
  "occurOnFullCommit": [
    "okLcWNb1uGkIQGRz5pQ87r78TNAQGZB7EBXqzKRQ9pnF6ScuutuO1ar2bF6ZAhSd",
    "3WDY22UpihL0SxcXNL0ca7gCSqohWKsR6BC2TC8mZ4OT55HEGLVZFuKuWWEOaXY7"
  ],
  "occurOnGet": [
    "sCIq86zdbZ6dsjvG7G79UqbpIqdqxCJRlB9c6L4CjJrRD6IUWUAkGqqZj26ZaUjR",
    "xDvfm4zMmBfduyVJ1vU5Ilwbn0Q42ZFVL7Xz2FzugnegGvARkxYdzUHW1x7jURMB"
  ],
  "occurOnLifecycle": [
    "UYfrQW8tZaKRHKeWZn8UD29nGdpkvnWqxcPVgbx5vJYj6E6FySn8IBv8RASdswZg"
  ],
  "initialValue": "3154138969255271271",
  "IRPAamount": "6600453853226938062",
  "valueHistory": [
    {
      "value": "2721073467101191060",
      "totalReservedAmount": "5493514222296149495",
      "dateFrom": "1737872769",
      "dateTo": "1737872779",
      "realModifyTime": "551052756"
    },
    {
      "value": "3303591803327152453",
      "totalReservedAmount": "6659231958921893806",
      "dateFrom": "1737872770",
      "dateTo": "1737872775",
      "realModifyTime": "674240877"
    },
    {
      "value": "6824078905440788558",
      "totalReservedAmount": "5247724868887723338",
      "dateFrom": "1737872769",
      "dateTo": "1737872773",
      "realModifyTime": "123745833"
    },
    {
      "value": "4772824632370736712",
      "totalReservedAmount": "7170061691535416162",
      "dateFrom": "1737872763",
      "dateTo": "1737872779",
      "realModifyTime": "440388519"
    }
  ],

Also, I did change - remove 'nested' from index mappings and set it as 'object'

Given that you are requiring ordering you will need to ensure single threaded bulk indexing is as fast as possible. Optimising mappings and only indexing fields you need to be able to search on may help to some extent but I suspect your main problem may lie elsewhere.

Elasticsearch is in my experience not very good at handling multiple updates to single documents within short time periods as I believe each update will result in a refresh, which is an expensive operation. This will override the refresh interval on the index, so increasing that probably does not help in your scenario. I would therefore recommend that you deduplicate data per bulk request before you send it to Elasticsearch in order to reduce the impact of this. A common guideline for optimal bulk size is often stated as 5 to 10 MB, but in your case it may be good to go larger as that likely will make the deduplication more efficient. Please test this and see if it improves performance.

If throughput is still lacking you may need to increase the number of topics/partitions in order to increase concurrency.

2 Likes

Thank you for answer, you mean 'enabled' in the index mappings?

Please see the documentation around optimising for disk usage for details. Some of these will also help with performance, as outlined in the guide on tuning for indexing throughput.

No, not at all an accurate understanding

The idea to fill a bunch of files with data, then let Logstash bulk index them, to more confirm the bottleneck really is the indexing on ES side?

I managed somewhat similar setup a few years back, logs going into Kafka, Kafka feeding to logstash, logstash indexing into ES. Ordering didn’t matter, it was “just logs” with timestamps. All were small VMs, later containers. 3 node Kafka cluster, one logs topic, I think 5 logstash indexers. I know we had 25k+ logs per second at peak. The logs were complex documents too, but at least mostly flat. And normalised within logstash too.

The 16 different Kafka topics all feed into same ES index?

I do not necessarily think this matters much as they would each hold separate sets of document ids.

Is that guaranteed? Did I miss that bit?

And are the throughput numbers quoted per-index, or overall? If it’s one to one mapping Kafka topic to index, then that’s good, and better than 9k docs/per second should be achievable. Overall. But per index, maybe not.

If it’s not one to one mapping, how is the ordering in index X being guaranteed ?

No, 16 threads feed 8 bulks for different indexes separately, bulk execution is separately too - if bulk feed enough (5000 documents, for example) - it's executed, not waiting other bulks

This is the key point IMO - the node has 96 CPUs but with only 16-or-fewer requests in flight at once you're only going to use a fraction of that CPU for indexing. Possibly increasing number_of_shards on the target index will help (indexing parallelizes across shards, to some extent), but really the bottleneck here sounds like it's upstream of ES, in the client. The solution is to send more concurrent requests. You need to be trying to push ES to the point where it starts to reject work with a 429 Too Many Requests occasionally - any less than that, you're not trying hard enough and ES has capacity to spare.

This is also important. If you have lots of updates to the same document, it is way more efficient to process them all on the client and send the result to ES rather than to let ES process each update one-by-one.

2 Likes

Agree with all that, but also feeling things here are a little strangely engineered. Keeping stuff simple is generally better.

If I’m understanding things then we have multiple Kafka topics, receiving data at varying rates, each topic containing a bunch of versions of the sane document, that are indexing N documents to M indices at discrete times, that per-index need to be indexed in right order cos only the last doc is required. Until it’s overwritten shortly thereafter, maybe even within the same bulk request.

( Didn’t we call that a transaction once ? )

At very least I’d dedupe the N requests to N-X before sending them to ES. But if X is anything like of same order as N,, or indeed significant at all in absolute value, then the design is …. suboptimal.

Does ES maintain a stat on #refreshes per index ?

Sorry, ES didn't support transaction in that word that it's meaning - bulk can fails in one operation, but other was done. It's not a transaction without rollout, it's just a pack of operations to make one REST API call to execute, no rollback. So, the main question is - how can we increase bulk performance with these requirements? No, we cannot make parallel bulks per Kafka topics, but IF we can tell ES "take this bulk, parse it, and send us response before indexing (as I think - most time spent at this very momemnt) - it'll be great

What you can do is increase the number of partitions of the Kafka topic and have one consumer per partition. As you need ordering you will need to assign documents to partition based on the document id so you know all documents related to a single document id will be in a specific partition. This way you will have multiple bulk requests in flight to the index at the same time but as they do contain separate sets of document ids this should not be a problem.

When doing this you also need to deduplicate each bulk batch before sending it to Elasticsearch as we discussed earlier.

I do not think this is possible. Handling it this would also, if it was possible, not allow you to resend data of the bulk request for some reason failed to index and you could easily end up dropping data without knowing it.

I meant transaction slightly sarcastically.

My suggestion isn’t what you wrote, but first of all it’d be interesting to know roughly what X is ? X= Number of documents per bulk that are updating same _id. If it’s almost always 0, a fair bit of this thread is moot.

If it’s even 1%, then …. that’s a problem you probably need solve outside ES. IMHO.

To write a little Kafka consumer to do that for your topics isn’t hard.