Question about data stream rollover timings

We're considering moving some large ES clusters from time-based indices to data streams. One concern we have is that with time-based indices we notice a delay between when the first event arrives and when the shards are fully initialized. This can result in processing delays.

To solve the problem with time-based indices we pre-create them several hours ahead of time to allow for the shards to initialize and all be ready for data when the clock hits midnight UTC.

If we switch to data streams we'll lose the ability to pre-create the backing indices ahead of time. So my question is: With data stream rollovers does ES wait for all shards to be available before it "switches" to the new backing index or will there be a delay where we're potentially writing all data to a subset of configured shards?

Welcome to our community! :smiley:

What is the output from the _cluster/stats?pretty&human API? Because it sounds like you have other things that are impact this and moving to ILM or datastreams may or may not help.

Sure. This is from one of several larger clusters:

{
  "_nodes" : {
    "total" : 83,
    "successful" : 83,
    "failed" : 0
  },
  "cluster_name" : "search",
  "cluster_uuid" : "uuid",
  "timestamp" : 1675984748796,
  "status" : "green",
  "indices" : {
    "count" : 52,
    "shards" : {
      "total" : 5828,
      "primaries" : 4299,
      "replication" : 0.3556641079320772,
      "index" : {
        "shards" : {
          "min" : 2,
          "max" : 600,
          "avg" : 112.07692307692308
        },
        "primaries" : {
          "min" : 1,
          "max" : 300,
          "avg" : 82.67307692307692
        },
        "replication" : {
          "min" : 0.0,
          "max" : 1.0,
          "avg" : 0.6923076923076923
        }
      }
    },
    "docs" : {
      "count" : 112020108931,
      "deleted" : 3346572
    },
    "store" : {
      "size" : "169.1tb",
      "size_in_bytes" : 186025596218883,
      "total_data_set_size" : "169.1tb",
      "total_data_set_size_in_bytes" : 186025596218883,
      "reserved" : "0b",
      "reserved_in_bytes" : 0
    },
    "fielddata" : {
      "memory_size" : "9.9gb",
      "memory_size_in_bytes" : 10666961688,
      "evictions" : 0
    },
    "query_cache" : {
      "memory_size" : "45.7gb",
      "memory_size_in_bytes" : 49076185424,
      "total_count" : 47808947291,
      "hit_count" : 1728121080,
      "miss_count" : 46080826211,
      "cache_size" : 3130937,
      "cache_count" : 10033128,
      "evictions" : 6902191
    },
    "completion" : {
      "size" : "0b",
      "size_in_bytes" : 0
    },
    "segments" : {
      "count" : 246294,
      "memory" : "48.9gb",
      "memory_in_bytes" : 52592200866,
      "terms_memory" : "44.1gb",
      "terms_memory_in_bytes" : 47397497032,
      "stored_fields_memory" : "359.2mb",
      "stored_fields_memory_in_bytes" : 376740576,
      "term_vectors_memory" : "0b",
      "term_vectors_memory_in_bytes" : 0,
      "norms_memory" : "381kb",
      "norms_memory_in_bytes" : 390208,
      "points_memory" : "0b",
      "points_memory_in_bytes" : 0,
      "doc_values_memory" : "4.4gb",
      "doc_values_memory_in_bytes" : 4817573050,
      "index_writer_memory" : "28.5gb",
      "index_writer_memory_in_bytes" : 30627134948,
      "version_map_memory" : "737.9mb",
      "version_map_memory_in_bytes" : 773800392,
      "fixed_bit_set" : "188.3kb",
      "fixed_bit_set_memory_in_bytes" : 192880,
      "max_unsafe_auto_id_timestamp" : 1675958471325,
      "file_sizes" : { }
    },
    "mappings" : {
      "field_types" : [
        {
          "name" : "binary",
          "count" : 8,
          "index_count" : 1,
          "script_count" : 0
        },
        {
          "name" : "boolean",
          "count" : 4333,
          "index_count" : 27,
          "script_count" : 0
        },
        {
          "name" : "constant_keyword",
          "count" : 6,
          "index_count" : 2,
          "script_count" : 0
        },
        {
          "name" : "date",
          "count" : 1925,
          "index_count" : 47,
          "script_count" : 0
        },
        {
          "name" : "flattened",
          "count" : 75,
          "index_count" : 37,
          "script_count" : 0
        },
        {
          "name" : "float",
          "count" : 319,
          "index_count" : 27,
          "script_count" : 0
        },
        {
          "name" : "geo_point",
          "count" : 74,
          "index_count" : 37,
          "script_count" : 0
        },
        {
          "name" : "integer",
          "count" : 23,
          "index_count" : 1,
          "script_count" : 0
        },
        {
          "name" : "ip",
          "count" : 150,
          "index_count" : 39,
          "script_count" : 0
        },
        {
          "name" : "keyword",
          "count" : 33658,
          "index_count" : 47,
          "script_count" : 0
        },
        {
          "name" : "long",
          "count" : 6550,
          "index_count" : 45,
          "script_count" : 0
        },
        {
          "name" : "match_only_text",
          "count" : 387,
          "index_count" : 37,
          "script_count" : 0
        },
        {
          "name" : "nested",
          "count" : 13,
          "index_count" : 5,
          "script_count" : 0
        },
        {
          "name" : "object",
          "count" : 8384,
          "index_count" : 47,
          "script_count" : 0
        },
        {
          "name" : "text",
          "count" : 30486,
          "index_count" : 47,
          "script_count" : 0
        },
        {
          "name" : "version",
          "count" : 4,
          "index_count" : 4,
          "script_count" : 0
        },
        {
          "name" : "wildcard",
          "count" : 81,
          "index_count" : 27,
          "script_count" : 0
        }
      ],
      "runtime_field_types" : [ ]
    },
    "analysis" : {
      "char_filter_types" : [ ],
      "tokenizer_types" : [ ],
      "filter_types" : [ ],
      "analyzer_types" : [ ],
      "built_in_char_filters" : [ ],
      "built_in_tokenizers" : [ ],
      "built_in_filters" : [ ],
      "built_in_analyzers" : [ ]
    },
    "versions" : [
      {
        "version" : "7.17.0",
        "index_count" : 52,
        "primary_shard_count" : 4299,
        "total_primary_size" : "134.2tb",
        "total_primary_bytes" : 147579437634898
      }
    ]
  },
  "nodes" : {
    "count" : {
      "total" : 83,
      "coordinating_only" : 0,
      "data" : 65,
      "data_cold" : 0,
      "data_content" : 0,
      "data_frozen" : 0,
      "data_hot" : 0,
      "data_warm" : 15,
      "ingest" : 65,
      "master" : 3,
      "ml" : 83,
      "remote_cluster_client" : 0,
      "transform" : 0,
      "voting_only" : 0
    },
    "versions" : [
      "7.17.0"
    ],
    "os" : {
      "available_processors" : 1304,
      "allocated_processors" : 1304,
      "names" : [
        {
          "name" : "Linux",
          "count" : 83
        }
      ],
      "pretty_names" : [
        {
          "pretty_name" : "Ubuntu 20.04.4 LTS",
          "count" : 72
        },
        {
          "pretty_name" : "Ubuntu 20.04.5 LTS",
          "count" : 11
        }
      ],
      "architectures" : [
        {
          "arch" : "amd64",
          "count" : 83
        }
      ],
      "mem" : {
        "total" : "9.7tb",
        "total_in_bytes" : 10686145216512,
        "free" : "263.5gb",
        "free_in_bytes" : 282970009600,
        "used" : "9.4tb",
        "used_in_bytes" : 10403175206912,
        "free_percent" : 3,
        "used_percent" : 97
      }
    },
    "process" : {
      "cpu" : {
        "percent" : 4856
      },
      "open_file_descriptors" : {
        "min" : 2308,
        "max" : 6115,
        "avg" : 3774
      }
    },
    "jvm" : {
      "max_uptime" : "198d",
      "max_uptime_in_millis" : 17114139193,
      "versions" : [
        {
          "version" : "11.0.17",
          "vm_name" : "OpenJDK 64-Bit Server VM",
          "vm_version" : "11.0.17+8-post-Ubuntu-1ubuntu220.04",
          "vm_vendor" : "Ubuntu",
          "bundled_jdk" : true,
          "using_bundled_jdk" : false,
          "count" : 16
        },
        {
          "version" : "11.0.16",
          "vm_name" : "OpenJDK 64-Bit Server VM",
          "vm_version" : "11.0.16+8-post-Ubuntu-0ubuntu120.04",
          "vm_vendor" : "Ubuntu",
          "bundled_jdk" : true,
          "using_bundled_jdk" : false,
          "count" : 5
        },
        {
          "version" : "11.0.15",
          "vm_name" : "OpenJDK 64-Bit Server VM",
          "vm_version" : "11.0.15+10-Ubuntu-0ubuntu0.20.04.1",
          "vm_vendor" : "Private Build",
          "bundled_jdk" : true,
          "using_bundled_jdk" : false,
          "count" : 62
        }
      ],
      "mem" : {
        "heap_used" : "1.3tb",
        "heap_used_in_bytes" : 1438226850576,
        "heap_max" : "2.5tb",
        "heap_max_in_bytes" : 2763270914048
      },
      "threads" : 176126
    },
    "fs" : {
      "total" : "363.5tb",
      "total_in_bytes" : 399712820109312,
      "free" : "194.1tb",
      "free_in_bytes" : 213493417062400,
      "available" : "194.1tb",
      "available_in_bytes" : 213492024541184
    },
    "plugins" : [
      {
        "name" : "repository-s3",
        "version" : "7.17.0",
        "elasticsearch_version" : "7.17.0",
        "java_version" : "1.8",
        "description" : "The S3 repository plugin adds S3 repositories",
        "classname" : "org.elasticsearch.repositories.s3.S3RepositoryPlugin",
        "extended_plugins" : [ ],
        "has_native_controller" : false,
        "licensed" : false,
        "type" : "isolated"
      }
    ],
    "network_types" : {
      "transport_types" : {
        "security4" : 83
      },
      "http_types" : {
        "security4" : 83
      }
    },
    "discovery_types" : {
      "zen" : 83
    },
    "packaging_types" : [
      {
        "flavor" : "default",
        "type" : "deb",
        "count" : 83
      }
    ],
    "ingest" : {
      "number_of_pipelines" : 3,
      "processor_stats" : {
        "geoip" : {
          "count" : 4172206080879,
          "failed" : 20418366267,
          "current" : 0,
          "time" : "280.4d",
          "time_in_millis" : 24231041795
        },
        "gsub" : {
          "count" : 0,
          "failed" : 0,
          "current" : 0,
          "time" : "0s",
          "time_in_millis" : 0
        },
        "rename" : {
          "count" : 4151787714610,
          "failed" : 0,
          "current" : 1,
          "time" : "19.2d",
          "time_in_millis" : 1661571609
        },
        "script" : {
          "count" : 0,
          "failed" : 0,
          "current" : 0,
          "time" : "0s",
          "time_in_millis" : 0
        },
        "user_agent" : {
          "count" : 1058462300300,
          "failed" : 97005380,
          "current" : 0,
          "time" : "51.8d",
          "time_in_millis" : 4482458993
        }
      }
    }
  }
}

Just to be clear. I'm not trying to solve any problems by switching to data streams. Our clusters are humming along fine using timestamped indices. Our lives would just be easier if we let data streams handle rollovers. When I say we pre-create the indices several hours ahead of time it's not because it takes several hours to initialize shards. It only takes 5 mins or so... we just don't want to delay ingest for 5 mins.

This is a concern that you shouldn't need to do this, the last time I saw it was in 5.X due to cluster state management inefficiencies.

Can you try creating an index and then grabbing hot_threads and the logs from the master node so we can see what it is doing?

These clusters are pretty big and ingesting >10TB/day each. The hot threads output it too large to post here. Is there something in particular I should be looking for?

I'm also just curious about the answer to my original question. When a data stream rolls over does the new backing index start to be used before the shards are fully initialized? To keep our shards under the recommended max size we're creating each index with 300 shards across 80+ nodes.

It sounds like the cluster is under constant heavy load, which may explain why changes to the cluster state and shard allocation may take long to complete.

I do not know the answer to this, but am also very interested in learning more about the internals.

It seems like you are updating and/or deleting data from at least some of the indices. Note that data streams are primarily designed for immutable data and that performing updates or deletes can be inefficient or not practical.

The beauty of data streams and rollover (used behind the scenes) is that indices no longer cover specific time periods and new indices can be created based on size. This means that you can have backing indices covering just a few hours during peak hours and longer time periods when volumes are lower as you can cut indices close to the ideal desired size. You therefore no onger need to overprovision the number of primary shards and can set this to the number of indexing nodes.

This mechanism is also what makes updating or deleting more difficult as you, compared to old time-based indices with date in the name, can not know which backing index a document resides in based on the event time.

2 Likes

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.