Question regarding size of bulk action

I have 3 nodes designated to be the ingestion nodes.
So all writers will round robin to those 3 nodes.
My question is should the bulkWrite size be sizing to the ingestion node or primary shard on nodes?
Let's say I have 30 data nodes. An index has 30 primary shards. So each builkWrite will be split to potentially 30 data nodes from one of the ingestion nodes.
Assuming no hot spot. So data is evenly distributed across 30 shards.
Could I potentially bulk write to ingestion node by X 30 the size?

Supposed I'm aiming for 20MB of data per bulk write. Should the bulk write be 600MB?

The reason I'm asking this is because I have been sizing my bulk write to single node (20MB in the example). Since the ingestion nodes don't do much, just Rx and distribute to data nodes.
With above example, if I bulk write 20MB, each data node will only be receiving < 600KB of payload, which seems too small.

Should I be increasing the bulk write size whenever I increase shard count?

No.
We generally recommend a bulk request of no larger than 5MB.

Ok. That was just an example.
Do you recommend 5MB regardless of shards for an index? My main question is an index with 1 shard vs 10 shards will have very different pattern on the data node.
1 shard == 5MB of payload to data node.
10 shards == 500k of payload to data node.

On the data node that holds the shard, yes. If you start sending large bulk requests to the cluster, the node that is accepting it is going to be put under a lot of pressure.

Thanks.
What we are seeing is queue size limit (2000) exceptions becoming more frequent. My investigation result concludes to exactly this.
We've increase shard for heavier indices, but bulk write size is still the same. So we ended up with
more frequent bulkWrites as data volume increases.

Form individual data node's point of view, each write is becoming punier as shard size increase (we have monthly indexing strategy so I'm talking about over time, not the same index). But frequency increased over the time period.
This means as the cluster grows, bulk writes become less and less efficient.

Is this causing an issue?

Yes. Queue size limit exceptions.

What version are you using? Have you deliberately configured this queue to be smaller than normal? All non-EOL versions permit more than 2000 items by default.

Both 7.3.1 & 7.15.0
2000 default.
It's a trend I start to see in the past few months after our cluster has grown over the past few years.

This scenario seems to be an inevitability when cluster grows. I'm curious how others handle this.

What I have been doing is slowly increasing the bulk write payload and it does help.
I can tell from my writers how often they write so this is a way for me to see potential queue size limit exception before it happens.
But the downside is the buffer size requirement for the writers will be growing much faster. And eventually become the bottle neck.
Do people typically cap the shard count for an index?

I can't be the first person hitting this bottle neck. Any help/feedbacks/suggestions will be appreciated.

What is the output from the _cluster/stats?pretty&human API?

This is not the case. The default in 7.15.0 is 10000. You must be making it shorter with your config.

This is for 7.15.0. I think 7.3.0 doesn't take "human" parameter

{
  "_nodes" : {
    "total" : 33,
    "successful" : 33,
    "failed" : 0
  },
  "cluster_name" : "Elasticsearch715_Cluster",
  "cluster_uuid" : "zGrKBe8NRNSPmDiPjCwXmQ",
  "timestamp" : 1649317703819,
  "status" : "green",
  "indices" : {
    "count" : 301,
    "shards" : {
      "total" : 4679,
      "primaries" : 2067,
      "replication" : 1.2636671504596033,
      "index" : {
        "shards" : {
          "min" : 2,
          "max" : 90,
          "avg" : 15.544850498338871
        },
        "primaries" : {
          "min" : 1,
          "max" : 30,
          "avg" : 6.867109634551495
        },
        "replication" : {
          "min" : 1.0,
          "max" : 2.0,
          "avg" : 1.186046511627907
        }
      }
    },
    "docs" : {
      "count" : 100577636088,
      "deleted" : 650936633
    },
    "store" : {
      "size" : "59.9tb",
      "size_in_bytes" : 65910066719042,
      "total_data_set_size" : "59.9tb",
      "total_data_set_size_in_bytes" : 65910066719042,
      "reserved" : "0b",
      "reserved_in_bytes" : 0
    },
    "fielddata" : {
      "memory_size" : "53.2gb",
      "memory_size_in_bytes" : 57186102416,
      "evictions" : 0
    },
    "query_cache" : {
      "memory_size" : "39.1gb",
      "memory_size_in_bytes" : 42087243288,
      "total_count" : 38837013082,
      "hit_count" : 2396895532,
      "miss_count" : 36440117550,
      "cache_size" : 1197679,
      "cache_count" : 29677135,
      "evictions" : 28479456
    },
    "completion" : {
      "size" : "0b",
      "size_in_bytes" : 0
    },
    "segments" : {
      "count" : 115757,
      "memory" : "1.5gb",
      "memory_in_bytes" : 1682768938,
      "terms_memory" : "1011.2mb",
      "terms_memory_in_bytes" : 1060364344,
      "stored_fields_memory" : "91.9mb",
      "stored_fields_memory_in_bytes" : 96412496,
      "term_vectors_memory" : "0b",
      "term_vectors_memory_in_bytes" : 0,
      "norms_memory" : "77.1mb",
      "norms_memory_in_bytes" : 80907264,
      "points_memory" : "0b",
      "points_memory_in_bytes" : 0,
      "doc_values_memory" : "424.4mb",
      "doc_values_memory_in_bytes" : 445084834,
      "index_writer_memory" : "10.4gb",
      "index_writer_memory_in_bytes" : 11229168174,
      "version_map_memory" : "1gb",
      "version_map_memory_in_bytes" : 1094103834,
      "fixed_bit_set" : "136.8kb",
      "fixed_bit_set_memory_in_bytes" : 140184,
      "max_unsafe_auto_id_timestamp" : 1649314801250,
      "file_sizes" : { }
    },
    "mappings" : {
      "field_types" : [
        {
          "name" : "boolean",
          "count" : 562,
          "index_count" : 167,
          "script_count" : 0
        },
        {
          "name" : "byte",
          "count" : 14,
          "index_count" : 7,
          "script_count" : 0
        },
        {
          "name" : "date",
          "count" : 855,
          "index_count" : 281,
          "script_count" : 0
        },
        {
          "name" : "dense_vector",
          "count" : 1,
          "index_count" : 1,
          "script_count" : 0
        },
        {
          "name" : "double",
          "count" : 6,
          "index_count" : 2,
          "script_count" : 0
        },
        {
          "name" : "flattened",
          "count" : 3,
          "index_count" : 3,
          "script_count" : 0
        },
        {
          "name" : "float",
          "count" : 1082,
          "index_count" : 130,
          "script_count" : 0
        },
        {
          "name" : "integer",
          "count" : 60,
          "index_count" : 40,
          "script_count" : 0
        },
        {
          "name" : "keyword",
          "count" : 7793,
          "index_count" : 292,
          "script_count" : 0
        },
        {
          "name" : "long",
          "count" : 1552,
          "index_count" : 216,
          "script_count" : 0
        },
        {
          "name" : "nested",
          "count" : 4,
          "index_count" : 4,
          "script_count" : 0
        },
        {
          "name" : "object",
          "count" : 732,
          "index_count" : 139,
          "script_count" : 0
        },
        {
          "name" : "short",
          "count" : 103,
          "index_count" : 31,
          "script_count" : 0
        },
        {
          "name" : "text",
          "count" : 3860,
          "index_count" : 256,
          "script_count" : 0
        },
        {
          "name" : "version",
          "count" : 4,
          "index_count" : 4,
          "script_count" : 0
        }
      ],
      "runtime_field_types" : [ ]
    },
    "analysis" : {
      "char_filter_types" : [ ],
      "tokenizer_types" : [ ],
      "filter_types" : [ ],
      "analyzer_types" : [ ],
      "built_in_char_filters" : [ ],
      "built_in_tokenizers" : [ ],
      "built_in_filters" : [ ],
      "built_in_analyzers" : [ ]
    },
    "versions" : [
      {
        "version" : "7.3.1",
        "index_count" : 1,
        "primary_shard_count" : 5,
        "total_primary_size" : "21.3gb",
        "total_primary_bytes" : 22926890377
      },
      {
        "version" : "7.15.0",
        "index_count" : 300,
        "primary_shard_count" : 2062,
        "total_primary_size" : "26.6tb",
        "total_primary_bytes" : 29337006822430
      }
    ]
  },
  "nodes" : {
    "count" : {
      "total" : 33,
      "coordinating_only" : 0,
      "data" : 30,
      "data_cold" : 0,
      "data_content" : 0,
      "data_frozen" : 0,
      "data_hot" : 0,
      "data_warm" : 0,
      "ingest" : 3,
      "master" : 3,
      "ml" : 0,
      "remote_cluster_client" : 0,
      "transform" : 0,
      "voting_only" : 0
    },
    "versions" : [
      "7.15.0"
    ],
    "os" : {
      "available_processors" : 384,
      "allocated_processors" : 384,
      "names" : [
        {
          "name" : "Linux",
          "count" : 33
        }
      ],
      "pretty_names" : [
        {
          "pretty_name" : "Ubuntu 20.04.2 LTS",
          "count" : 33
        }
      ],
      "architectures" : [
        {
          "arch" : "amd64",
          "count" : 33
        }
      ],
      "mem" : {
        "total" : "2.8tb",
        "total_in_bytes" : 3101698805760,
        "free" : "86gb",
        "free_in_bytes" : 92400279552,
        "used" : "2.7tb",
        "used_in_bytes" : 3009298526208,
        "free_percent" : 3,
        "used_percent" : 97
      }
    },
    "process" : {
      "cpu" : {
        "percent" : 608
      },
      "open_file_descriptors" : {
        "min" : 1750,
        "max" : 3938,
        "avg" : 3612
      }
    },
    "jvm" : {
      "max_uptime" : "112.9d",
      "max_uptime_in_millis" : 9761769593,
      "versions" : [
        {
          "version" : "16.0.2",
          "vm_name" : "OpenJDK 64-Bit Server VM",
          "vm_version" : "16.0.2+7",
          "vm_vendor" : "Eclipse Foundation",
          "bundled_jdk" : true,
          "using_bundled_jdk" : true,
          "count" : 33
        }
      ],
      "mem" : {
        "heap_used" : "506.4gb",
        "heap_used_in_bytes" : 543817278336,
        "heap_max" : "976.2gb",
        "heap_max_in_bytes" : 1048206901248
      },
      "threads" : 4669
    },
    "fs" : {
      "total" : "203.2tb",
      "total_in_bytes" : 223509312897024,
      "free" : "143.1tb",
      "free_in_bytes" : 157359922655232,
      "available" : "132.8tb",
      "available_in_bytes" : 146093262921728
    },
    "plugins" : [
      {
        "name" : "repository-s3",
        "version" : "7.15.0",
        "elasticsearch_version" : "7.15.0",
        "java_version" : "1.8",
        "description" : "The S3 repository plugin adds S3 repositories",
        "classname" : "org.elasticsearch.repositories.s3.S3RepositoryPlugin",
        "extended_plugins" : [ ],
        "has_native_controller" : false,
        "licensed" : false,
        "type" : "isolated"
      }
    ],
    "network_types" : {
      "transport_types" : {
        "netty4" : 33
      },
      "http_types" : {
        "netty4" : 33
      }
    },
    "discovery_types" : {
      "zen" : 33
    },
    "packaging_types" : [
      {
        "flavor" : "default",
        "type" : "deb",
        "count" : 33
      }
    ],
    "ingest" : {
      "number_of_pipelines" : 2,
      "processor_stats" : {
        "gsub" : {
          "count" : 0,
          "failed" : 0,
          "current" : 0,
          "time" : "0s",
          "time_in_millis" : 0
        },
        "script" : {
          "count" : 0,
          "failed" : 0,
          "current" : 0,
          "time" : "0s",
          "time_in_millis" : 0
        }
      }
    }
  }
}

Sorry it's 200 for 7.3.1. Not 2000.
I am not sure about 7.15.0 since I have not changed the queue size setting for either. Not sure if there's an easy way to dump out the setting with kibana.

We typically use double write strategy to upgrade; therefore, 2 clusters with different versions for us now.
You might be right about larger queue size for 7.15.0. I think all the queue size limit exceptions were from 7.3.0 after reading the logs more carefully.
The writers are doing double write to both clusters during this upgrade transition. So I assume it happens to both clusters. My bad.

Maybe the larger queue size in later version works well. But it seems to be a potential issue with regard to growing cluster size though.

The fact that write frequency increase will eventually under utilize bulk writes efficiency.
An index of 100 shards means each data node only receives 50k payload size seems too small.

If I give my ingest node huge RAM say 1TB (as an example), will this raise the 5MB bulk write payload recommendation? Or is there something behind the scene making it counter productive when the payload size is greater than X? Like the < 32GB heap for java?
Our ingest nodes are not data nodes, so they don't store any data.

The guideline about 5MB write sizes is really just a guideline that works well for most people, but most people don't have 100 shards in an index. When you get to that sort of scale you will not find any generic guidelines to be helpful. The only way to get accurate answers is to do careful benchmarking of your actual workload. You may find that Elasticsearch can tolerate much larger writes in your case without a problem, but then again you may not. It depends on exactly how you're using it.

1 Like

It might help if you describe your use case and sharding strategy. Are you indexing immutable data or performing updates/deletes? Are you using time-based indices? What is your retention policy? How many different indices are you actively indexing into?

Thanks for the suggestions. That's what I figured.
We don't have 100 shards yet, but will eventually get there.
We have different types of indices. Some are stats for devices so updates is the norm.
Some are events so rollover index is the norm. We also break up the event indices based on month too. So every monthly index is rolled over based on shard size.
With stats, we could control how often we write.
But events is a crap shoot. They come whenever events happens. It's much harder to anticipate event surges.

Naturally, when the size of "customer" (input source) increases, events count will jump none linearly.
So the natural way (I think for most folks) to scale is to add more nodes and bump the shard count up so next index (rollover or monthly new index) will naturally have more capacity.
But the writer doesn't get the same scaling strategy today. We simply have a semi fixed buffer before bulk write.
It's not hard to see the potential issue (which we have experienced) of too many bulk writes beyond queue size.

I believe we eventually will need to break up the cluster into multiple clusters to solve this issue.
Since you guys have a cloud service. I imagine the capacity must be much greater than our own deployment. Just want to pick some brains. Maybe there's a well know scaling method I'm not aware of?

It sounds like you're only seeing problems in your ancient 7.3 cluster, so the obvious first thing to try is to stop using such an old version. I expect an upgrade to 7.17 will make a much bigger difference than any tuning of the parameters you're asking about.

How much data are you expecting to be dealing with anyway? I know of clusters that are ingesting multiple TBs per hour with months of retention and they aren't even close to needing 100 shards to support the ingest load.

We got lots of events. We could only retain for about 6 weeks with current strategy.
Since we use monthly index, retention of old data is not in the picture here. It's the amount of data per month that dictates our sharding. Keeping older data simply increase the total shard count and storage requirement. So we decided to have shorter retention.
I will let 7.15 run for a while and see if it's indeed no longer an issue. Thanks a lot.

Could you be more precise? Roughly how many events per second do you count as "lots"?