ES Cloud - Running ingest pipelines on warm nodes?

Hi there,

I have a hot/warm cluster running on ES cloud with the following setup:

  • 2x 29Gb hot nodes
  • 2x 29Gb warm nodes
  • time series data
  • ingest rate: 2500/s (350 bytes each) ~75Gb/day
  • enrich pipeline (with enrich snapshot of ~350k records)
  • 3 transforms daisy chained to rollup by the minute and then 15mins

The problem:
When I activate the pipeline, my entry index starts to backlog until it loses the ability to continue processing.

Tests:
I ran this on a single AZ 29Gb instance and it works flawlessly.

Question:
Why does ES Cloud allow ingest to happen on warm nodes? It seems that these d2 instances are not configured to handle this kind of load?

Thoughts?


Have you contacted Elastic Cloud support about this?

I did. They want me to pay for a support plan. They feel that I am doing something wrong and have pointed me to docs on how to tune the cluster.

I've done some more testing and it's not just related to warm nodes, but ANY nodes. I am unable to run this enrich processor with 2x 58G nodes in the same AZ. It only works on a single node cluster.

This is related to Enrich Processor is slow on multi nodes

Hi Gents,

Can we keep this in 1 thread please, thx.

@Alain_St_Pierre @Mike_Connor

@warkolm can you merge these 2 threads? or close the other one.

@Alain_St_Pierre thanks for answering on the other thread that help me understand a few things, it sounded like someone was executing the force merge on their own etc..

I am not sure whether I can help you or not but if you are up for it I will ask some basic questions perhaps we can get to the bottom of this, but I am a volunteer here / I have a day job.

Elastic certainly has other users running enrich on multi node clusters. (if I have a chance I will work up a sample)

You are running on Elastic Cloud from elastic.co not AWS ES correct?

Have you added / changes any of the default settings for elasticsearch?

Can we try to debug on just 2 Hot Nodes?
(not the Hot / Warm Architecture, we can perhaps get to that later / Data should not be being ingested on Warm nodes but that is an additional topic related to index lifecycle management )

In the dev tools can you run the following and provide outputs after you have set up the 2 nodes with the correct enrich policy.

GET /_cat/indices/.e*?v

GET _cat/aliases/.e*

GET _cat/shards/.e*?v

Can you provide the enrich policy?

Can you provide the ingest pipeline?

What tools are used to ingest the main time series data?
How is it configured?

How are you calling the pipeline?

1 Like

Also can you provide the settings for your time series data

GET <timeseriesidex>/_settings?include_defaults

BTW I looked at the other thread the index settings....

That allocation settings is why ingest is going to all nodes not just Hot, there is no routing to hot nodes, that is why ingest is most likely going to any nodes, which is not what you want.

     "routing": {
        "rebalance": {
          "enable": "all"
        },
        "allocation": {
          "enable": "all",
          "total_shards_per_node": "-1"
        }
      },

There should be an allocation that looks something like this, which should be assigned to any new created index that you create on and ES Cloud Hot / Warm cluster unless it was specifically removed or the settings manually created without it.

{
  "zip-enriched" : {
    "settings" : {
      "index" : {
        "routing" : {
          "allocation" : { <<--------
            "require" : { <<--------
              "data" : "hot" <<--------
            }
          }
        },

I also created a hot / warm cluster and did an enrich test, probably not as sophisticated as your but I easily got up to 10K/s ingest rate.

Thanks @stephenb for your help on this. Truly appreciate your time...

Yes, I have this cluster set up on elactic.co. I don't think there is anything out of the ordinary for this set up. We have a couple of continuous transforms happening and ILMs set up on some of the indices to manage disk space. The newly provisioned cluster has been set up using hot only architecture (as there wasn't a way to downgrade/remove the warm nodes in the hot/warm pattern I used initially).

I will provide the current outputs of the single node setup, then spin up another node and provide the same after...

GET /_cat/indices/.e*?v

health status index                               uuid                   pri rep docs.count docs.deleted store.size pri.store.size
green  open   .enrich-enrich_vessel-1609893468927 Fu5mGDdUSIO5PLdY-KTq2A   1   0     361842            0     24.7mb         24.7mb

GET _cat/aliases/.e*

.enrich-enrich_vessel .enrich-enrich_vessel-1609893468927 - - - -

GET _cat/shards/.e*?v

index                               shard prirep state     docs  store ip            node
.enrich-enrich_vessel-1609893468927 0     p      STARTED 361842 24.7mb 172.25.84.100 instance-0000000013

Enrich policy is:

{
    "match": {
        "indices": "vessel",
        "match_field": "mmsi",
        "enrich_fields": [
          "callsign",
          "shipname",
          "shiptype",
          "imo",
          "mothership_mmsi",
          "width",
          "length",
          "ais_msg_type",
          "event_ts"
        ]
    }
}

track pipeline:

{
  "description": "Handles incoming position log events going to track index",
  "processors": [
    {
      "pipeline": {
        "name": "timestamps",
        "tag": "timestamps_track"
      }
    },
    {
      "pipeline": {
        "name": "enrich_vessel",
        "tag": "vessel_enrich_track"
      }
    }
  ]
}

enrich_vessel pipeline:

{
  "description": "This pipeline adds vessel information to position messages.",
  "processors": [
    {
      "enrich": {
        "policy_name": "enrich_vessel",
        "field": "mmsi",
        "target_field": "vessel"
      }
    }
  ]
}

I am using Filebeats to send events to ES. Here is the output of the config:

output.elasticsearch:
  hosts: ["${ELASTIC_HOST}"]
  username: "${ELASTIC_USERNAME}"
  password: "${ELASTIC_PASSWORD}"
  bulk_max_size: 1000
  pipeline: "%{[@indexmeta.idx]}"
  indices:
    - index: "%{[@indexmeta.idx]}"

@indexmeta.idx in this case is track

Here are the settings of the current write index:

"track-000032" : {
    "settings" : {
      "index" : {
        "lifecycle" : {
          "name" : "ilm_track",
          "rollover_alias" : "track"
        },
        "number_of_shards" : "1",
        "provided_name" : "track-000032",
        "creation_date" : "1609920008341",
        "priority" : "100",
        "number_of_replicas" : "1",
        "uuid" : "JjLw6xnVRpqekY9okrFFyQ",
        "version" : {
          "created" : "7090399"
        }
      }
    },
    "defaults" : {
      "index" : {
        "flush_after_merge" : "512mb",
        "final_pipeline" : "_none",
        "max_inner_result_window" : "100",
        "unassigned" : {
          "node_left" : {
            "delayed_timeout" : "1m"
          }
        },
        "max_terms_count" : "65536",
        "lifecycle" : {
          "parse_origination_date" : "false",
          "indexing_complete" : "false",
          "origination_date" : "-1"
        },
        "routing_partition_size" : "1",
        "force_memory_term_dictionary" : "false",
        "max_docvalue_fields_search" : "100",
        "merge" : {
          "scheduler" : {
            "max_thread_count" : "4",
            "auto_throttle" : "true",
            "max_merge_count" : "9"
          },
          "policy" : {
            "reclaim_deletes_weight" : "2.0",
            "floor_segment" : "2mb",
            "max_merge_at_once_explicit" : "30",
            "max_merge_at_once" : "10",
            "max_merged_segment" : "5gb",
            "expunge_deletes_allowed" : "10.0",
            "segments_per_tier" : "10.0",
            "deletes_pct_allowed" : "33.0"
          }
        },
        "max_refresh_listeners" : "1000",
        "max_regex_length" : "1000",
        "load_fixed_bitset_filters_eagerly" : "true",
        "number_of_routing_shards" : "1",
        "write" : {
          "wait_for_active_shards" : "1"
        },
        "verified_before_close" : "false",
        "mapping" : {
          "coerce" : "false",
          "nested_fields" : {
            "limit" : "50"
          },
          "depth" : {
            "limit" : "20"
          },
          "field_name_length" : {
            "limit" : "9223372036854775807"
          },
          "total_fields" : {
            "limit" : "1000"
          },
          "nested_objects" : {
            "limit" : "10000"
          },
          "ignore_malformed" : "false"
        },
        "source_only" : "false",
        "soft_deletes" : {
          "enabled" : "false",
          "retention" : {
            "operations" : "0"
          },
          "retention_lease" : {
            "period" : "12h"
          }
        },
        "max_script_fields" : "32",
        "query" : {
          "default_field" : [
            "*"
          ],
          "parse" : {
            "allow_unmapped_fields" : "true"
          }
        },
        "format" : "0",
        "frozen" : "false",
        "sort" : {
          "missing" : [ ],
          "mode" : [ ],
          "field" : [ ],
          "order" : [ ]
        },
        "codec" : "default",
        "max_rescore_window" : "10000",
        "max_adjacency_matrix_filters" : "100",
        "analyze" : {
          "max_token_count" : "10000"
        },
        "gc_deletes" : "60s",
        "top_metrics_max_size" : "10",
        "optimize_auto_generated_id" : "true",
        "max_ngram_diff" : "1",
        "hidden" : "false",
        "translog" : {
          "generation_threshold_size" : "64mb",
          "flush_threshold_size" : "512mb",
          "sync_interval" : "5s",
          "retention" : {
            "size" : "512MB",
            "age" : "12h"
          },
          "durability" : "REQUEST"
        },
        "auto_expand_replicas" : "false",
        "mapper" : {
          "dynamic" : "true"
        },
        "recovery" : {
          "type" : ""
        },
        "requests" : {
          "cache" : {
            "enable" : "true"
          }
        },
        "data_path" : "",
        "highlight" : {
          "max_analyzed_offset" : "1000000"
        },
        "routing" : {
          "rebalance" : {
            "enable" : "all"
          },
          "allocation" : {
            "enable" : "all",
            "total_shards_per_node" : "-1"
          }
        },
        "search" : {
          "slowlog" : {
            "level" : "TRACE",
            "threshold" : {
              "fetch" : {
                "warn" : "-1",
                "trace" : "-1",
                "debug" : "-1",
                "info" : "-1"
              },
              "query" : {
                "warn" : "-1",
                "trace" : "-1",
                "debug" : "-1",
                "info" : "-1"
              }
            }
          },
          "idle" : {
            "after" : "30s"
          },
          "throttled" : "false"
        },
        "fielddata" : {
          "cache" : "node"
        },
        "default_pipeline" : "_none",
        "max_slices_per_scroll" : "1024",
        "shard" : {
          "check_on_startup" : "false"
        },
        "xpack" : {
          "watcher" : {
            "template" : {
              "version" : ""
            }
          },
          "version" : "",
          "ccr" : {
            "following_index" : "false"
          }
        },
        "percolator" : {
          "map_unmapped_fields_as_text" : "false"
        },
        "allocation" : {
          "max_retries" : "5",
          "existing_shards_allocator" : "gateway_allocator"
        },
        "refresh_interval" : "1s",
        "indexing" : {
          "slowlog" : {
            "reformat" : "true",
            "threshold" : {
              "index" : {
                "warn" : "-1",
                "trace" : "-1",
                "debug" : "-1",
                "info" : "-1"
              }
            },
            "source" : "1000",
            "level" : "TRACE"
          }
        },
        "compound_format" : "0.1",
        "blocks" : {
          "metadata" : "false",
          "read" : "false",
          "read_only_allow_delete" : "false",
          "read_only" : "false",
          "write" : "false"
        },
        "max_result_window" : "10000",
        "store" : {
          "stats_refresh_interval" : "10s",
          "type" : "",
          "fs" : {
            "fs_lock" : "native"
          },
          "preload" : [ ]
        },
        "queries" : {
          "cache" : {
            "enabled" : "true"
          }
        },
        "warmer" : {
          "enabled" : "true"
        },
        "max_shingle_diff" : "3",
        "query_string" : {
          "lenient" : "false"
        }
      }
    }

Quick off the top question is that above is a single node cluster? if not how many nodes?
I have some other things to do... I will take a look a bit later...

Here is the index template that I apply (ignore the mappings as I dynamically add that in a python script):

{
  "index_patterns": ["track-*"],
  "settings": {
    "number_of_shards": 1,
    "number_of_replicas": 1,
    "lifecycle.name": "ilm_track",
    "lifecycle.rollover_alias": "track"
  },
  "mappings" : {
    "dynamic": false,
    "properties" : "This text gets replaced"
  }
}

@stephenb, yes those are single node settings... I am about to spin up another node and I'll report the new settings after.

Understood / and I would expect that the .enrich-enrich_vessel should be promoted to have a replica when you go to 2 nodes...

GET /_cat/indices/.e*?v
GET _cat/shards/.e*?v

So it looks like I should add this to my track template if I want to use hot/warm (once we sort this out in hot). Will this tell the warm nodes not to process ingest pipelines even though the warm nodes have the ingest node attribute?

That's exactly what I'll check...

One thing at a time... that should be added automatically... with a proper Hot / Warm... perhaps as we promoted from Hot Only to Hot Warm that did not happen.
Lets take a step at a time... and get the 2 hot nodes working

What version of the cluster are you running?

I am seeing something missing from your index settings that I would expect with a 7.10 cluster...

v7.9.3

Here are the settings with the two hot nodes:

# GET /_cat/indices/.e*?v
health status index                               uuid                   pri rep docs.count docs.deleted store.size pri.store.size
yellow open   .enrich-enrich_vessel-1609893468927 Fu5mGDdUSIO5PLdY-KTq2A   1   1     361842            0     24.7mb         24.7mb

# GET _cat/aliases/.e*
.enrich-enrich_vessel .enrich-enrich_vessel-1609893468927 - - - -

# GET _cat/shards/.e*?v
index                               shard prirep state        docs  store ip            node
.enrich-enrich_vessel-1609893468927 0     p      STARTED    361842 24.7mb 172.25.84.100 instance-0000000013
.enrich-enrich_vessel-1609893468927 0     r      UNASSIGNED                             

# GET .e*/_settings?include_defaults
{
  ".enrich-enrich_vessel-1609893468927" : {
    "settings" : {
      "index" : {
        "refresh_interval" : "-1",
        "number_of_shards" : "1",
        "auto_expand_replicas" : "0-all",
        "blocks" : {
          "write" : "true"
        },
        "provided_name" : ".enrich-enrich_vessel-1609893468927",
        "creation_date" : "1609893468928",
        "number_of_replicas" : "1",
        "uuid" : "Fu5mGDdUSIO5PLdY-KTq2A",
        "version" : {
          "created" : "7090399"
        },
        "warmer" : {
          "enabled" : "false"
        }
      }
    },
    "defaults" : {
      "index" : {
        "flush_after_merge" : "512mb",
        "final_pipeline" : "_none",
        "max_inner_result_window" : "100",
        "unassigned" : {
          "node_left" : {
            "delayed_timeout" : "1m"
          }
        },
        "max_terms_count" : "65536",
        "lifecycle" : {
          "name" : "",
          "parse_origination_date" : "false",
          "indexing_complete" : "false",
          "rollover_alias" : "",
          "origination_date" : "-1"
        },
        "routing_partition_size" : "1",
        "force_memory_term_dictionary" : "false",
        "max_docvalue_fields_search" : "100",
        "merge" : {
          "scheduler" : {
            "max_thread_count" : "4",
            "auto_throttle" : "true",
            "max_merge_count" : "9"
          },
          "policy" : {
            "reclaim_deletes_weight" : "2.0",
            "floor_segment" : "2mb",
            "max_merge_at_once_explicit" : "30",
            "max_merge_at_once" : "10",
            "max_merged_segment" : "5gb",
            "expunge_deletes_allowed" : "10.0",
            "segments_per_tier" : "10.0",
            "deletes_pct_allowed" : "33.0"
          }
        },
        "max_refresh_listeners" : "1000",
        "max_regex_length" : "1000",
        "load_fixed_bitset_filters_eagerly" : "true",
        "number_of_routing_shards" : "1",
        "write" : {
          "wait_for_active_shards" : "1"
        },
        "verified_before_close" : "false",
        "mapping" : {
          "coerce" : "false",
          "nested_fields" : {
            "limit" : "50"
          },
          "depth" : {
            "limit" : "20"
          },
          "field_name_length" : {
            "limit" : "9223372036854775807"
          },
          "total_fields" : {
            "limit" : "1000"
          },
          "nested_objects" : {
            "limit" : "10000"
          },
          "ignore_malformed" : "false"
        },
        "source_only" : "false",
        "soft_deletes" : {
          "enabled" : "false",
          "retention" : {
            "operations" : "0"
          },
          "retention_lease" : {
            "period" : "12h"
          }
        },
        "max_script_fields" : "32",
        "query" : {
          "default_field" : [
            "*"
          ],
          "parse" : {
            "allow_unmapped_fields" : "true"
          }
        },
        "format" : "0",
        "frozen" : "false",
        "sort" : {
          "missing" : [ ],
          "mode" : [ ],
          "field" : [ ],
          "order" : [ ]
        },
        "priority" : "1",
        "codec" : "default",
        "max_rescore_window" : "10000",
        "max_adjacency_matrix_filters" : "100",
        "analyze" : {
          "max_token_count" : "10000"
        },
        "gc_deletes" : "60s",
        "top_metrics_max_size" : "10",
        "optimize_auto_generated_id" : "true",
        "max_ngram_diff" : "1",
        "hidden" : "false",
        "translog" : {
          "generation_threshold_size" : "64mb",
          "flush_threshold_size" : "512mb",
          "sync_interval" : "5s",
          "retention" : {
            "size" : "512MB",
            "age" : "12h"
          },
          "durability" : "REQUEST"
        },
        "mapper" : {
          "dynamic" : "true"
        },
        "recovery" : {
          "type" : ""
        },
        "requests" : {
          "cache" : {
            "enable" : "true"
          }
        },
        "data_path" : "",
        "highlight" : {
          "max_analyzed_offset" : "1000000"
        },
        "routing" : {
          "rebalance" : {
            "enable" : "all"
          },
          "allocation" : {
            "enable" : "all",
            "total_shards_per_node" : "-1"
          }
        },
        "search" : {
          "slowlog" : {
            "level" : "TRACE",
            "threshold" : {
              "fetch" : {
                "warn" : "-1",
                "trace" : "-1",
                "debug" : "-1",
                "info" : "-1"
              },
              "query" : {
                "warn" : "-1",
                "trace" : "-1",
                "debug" : "-1",
                "info" : "-1"
              }
            }
          },
          "idle" : {
            "after" : "30s"
          },
          "throttled" : "false"
        },
        "fielddata" : {
          "cache" : "node"
        },
        "default_pipeline" : "_none",
        "max_slices_per_scroll" : "1024",
        "shard" : {
          "check_on_startup" : "false"
        },
        "xpack" : {
          "watcher" : {
            "template" : {
              "version" : ""
            }
          },
          "version" : "",
          "ccr" : {
            "following_index" : "false"
          }
        },
        "percolator" : {
          "map_unmapped_fields_as_text" : "false"
        },
        "allocation" : {
          "max_retries" : "5",
          "existing_shards_allocator" : "gateway_allocator"
        },
        "indexing" : {
          "slowlog" : {
            "reformat" : "true",
            "threshold" : {
              "index" : {
                "warn" : "-1",
                "trace" : "-1",
                "debug" : "-1",
                "info" : "-1"
              }
            },
            "source" : "1000",
            "level" : "TRACE"
          }
        },
        "compound_format" : "0.1",
        "blocks" : {
          "metadata" : "false",
          "read" : "false",
          "read_only_allow_delete" : "false",
          "read_only" : "false"
        },
        "max_result_window" : "10000",
        "store" : {
          "stats_refresh_interval" : "10s",
          "type" : "",
          "fs" : {
            "fs_lock" : "native"
          },
          "preload" : [ ]
        },
        "queries" : {
          "cache" : {
            "enabled" : "true"
          }
        },
        "max_shingle_diff" : "3",
        "query_string" : {
          "lenient" : "false"
        }
      }
    }
  }
}

OK I will need to repeat some of my tests on 7.9.3

Wait a few minutes and see if that replica shard gets assigned....

Let me know... If not that is definitely a (if not THE) problem

It still says that the replica shard is UNASSIGNED. However, the new node that I just provisioned is still in the process of rebalancing the cluster (which had 800Gb on it)...

How long do I have to wait?

Oh... I did not know you were doing with this much data I would have tested on a much smaller set... hopefully this is not your production cluster.

You will probably need to wait till everything is rebalanced how long that is will be a while... I suspect.

That

        "auto_expand_replicas" : "0-all",

Is what tells the .enrich-enrich_vessel-... to expand the replicas..