Enrich Processor is slow on multi nodes

Hello,

I am helping with a project that is using an ingest pipeline, specifically the enrich processor. The enrich processor is pulling from a source index that is fairly small (300k records, ~25MB). The documents that are being "enriched" are coming in at a rate of 2500/s. When running on a single node, it works fine. When we add another node, the throughput starts to decline until it stops working. We do not have a deep enough understanding of what is happening under the hood to troubleshoot this. Looking for any help/suggestions to get this working across multiple nodes.

Cheers,

What's the setting of the "source index" ? number of replicas ? that may speed up when having multiple nodes

Here is the settings for the source index:

{
  "settings": {
    "index": {
      "creation_date": "1608237270261",
      "number_of_shards": "1",
      "number_of_replicas": "1",
      "uuid": "4wuW43OyQLql9UvXvsXLTQ",
      "version": {
        "created": "7090399"
      },
      "provided_name": "vess-000001"
    }
  },
  "defaults": {
    "index": {
      "flush_after_merge": "512mb",
      "final_pipeline": "_none",
      "max_inner_result_window": "100",
      "unassigned": {
        "node_left": {
          "delayed_timeout": "1m"
        }
      },
      "max_terms_count": "65536",
      "lifecycle": {
        "name": "",
        "parse_origination_date": "false",
        "indexing_complete": "false",
        "rollover_alias": "",
        "origination_date": "-1"
      },
      "routing_partition_size": "1",
      "force_memory_term_dictionary": "false",
      "max_docvalue_fields_search": "100",
      "merge": {
        "scheduler": {
          "max_thread_count": "4",
          "auto_throttle": "true",
          "max_merge_count": "9"
        },
        "policy": {
          "reclaim_deletes_weight": "2.0",
          "floor_segment": "2mb",
          "max_merge_at_once_explicit": "30",
          "max_merge_at_once": "10",
          "max_merged_segment": "5gb",
          "expunge_deletes_allowed": "10.0",
          "segments_per_tier": "10.0",
          "deletes_pct_allowed": "33.0"
        }
      },
      "max_refresh_listeners": "1000",
      "max_regex_length": "1000",
      "load_fixed_bitset_filters_eagerly": "true",
      "number_of_routing_shards": "1",
      "write": {
        "wait_for_active_shards": "1"
      },
      "verified_before_close": "false",
      "mapping": {
        "coerce": "false",
        "nested_fields": {
          "limit": "50"
        },
        "depth": {
          "limit": "20"
        },
        "field_name_length": {
          "limit": "9223372036854775807"
        },
        "total_fields": {
          "limit": "1000"
        },
        "nested_objects": {
          "limit": "10000"
        },
        "ignore_malformed": "false"
      },
      "source_only": "false",
      "soft_deletes": {
        "enabled": "false",
        "retention": {
          "operations": "0"
        },
        "retention_lease": {
          "period": "12h"
        }
      },
      "max_script_fields": "32",
      "query": {
        "default_field": [
          "*"
        ],
        "parse": {
          "allow_unmapped_fields": "true"
        }
      },
      "format": "0",
      "frozen": "false",
      "sort": {
        "missing": [],
        "mode": [],
        "field": [],
        "order": []
      },
      "priority": "1",
      "codec": "default",
      "max_rescore_window": "10000",
      "max_adjacency_matrix_filters": "100",
      "analyze": {
        "max_token_count": "10000"
      },
      "gc_deletes": "60s",
      "top_metrics_max_size": "10",
      "optimize_auto_generated_id": "true",
      "max_ngram_diff": "1",
      "hidden": "false",
      "translog": {
        "generation_threshold_size": "64mb",
        "flush_threshold_size": "512mb",
        "sync_interval": "5s",
        "retention": {
          "size": "512MB",
          "age": "12h"
        },
        "durability": "REQUEST"
      },
      "auto_expand_replicas": "false",
      "mapper": {
        "dynamic": "true"
      },
      "recovery": {
        "type": ""
      },
      "requests": {
        "cache": {
          "enable": "true"
        }
      },
      "data_path": "",
      "highlight": {
        "max_analyzed_offset": "1000000"
      },
      "routing": {
        "rebalance": {
          "enable": "all"
        },
        "allocation": {
          "enable": "all",
          "total_shards_per_node": "-1"
        }
      },
      "search": {
        "slowlog": {
          "level": "TRACE",
          "threshold": {
            "fetch": {
              "warn": "-1",
              "trace": "-1",
              "debug": "-1",
              "info": "-1"
            },
            "query": {
              "warn": "-1",
              "trace": "-1",
              "debug": "-1",
              "info": "-1"
            }
          }
        },
        "idle": {
          "after": "30s"
        },
        "throttled": "false"
      },
      "fielddata": {
        "cache": "node"
      },
      "default_pipeline": "_none",
      "max_slices_per_scroll": "1024",
      "shard": {
        "check_on_startup": "false"
      },
      "xpack": {
        "watcher": {
          "template": {
            "version": ""
          }
        },
        "version": "",
        "ccr": {
          "following_index": "false"
        }
      },
      "percolator": {
        "map_unmapped_fields_as_text": "false"
      },
      "allocation": {
        "max_retries": "5",
        "existing_shards_allocator": "gateway_allocator"
      },
      "refresh_interval": "1s",
      "indexing": {
        "slowlog": {
          "reformat": "true",
          "threshold": {
            "index": {
              "warn": "-1",
              "trace": "-1",
              "debug": "-1",
              "info": "-1"
            }
          },
          "source": "1000",
          "level": "TRACE"
        }
      },
      "compound_format": "0.1",
      "blocks": {
        "metadata": "false",
        "read": "false",
        "read_only_allow_delete": "false",
        "read_only": "false",
        "write": "false"
      },
      "max_result_window": "10000",
      "store": {
        "stats_refresh_interval": "10s",
        "type": "",
        "fs": {
          "fs_lock": "native"
        },
        "preload": []
      },
      "queries": {
        "cache": {
          "enabled": "true"
        }
      },
      "warmer": {
        "enabled": "true"
      },
      "max_shingle_diff": "3",
      "query_string": {
        "lenient": "false"
      }
    }
  }
}

Is more replicas going to increase or decrease complexity for the enrich processor?

Shouldn't really matter as that happens after the pipeline.

Any ideas what could be slowing down the pipeline on multiple nodes? How often is the enrich index created/updated? Could the force_merge, when the enrich index is created, across multiple nodes be slowing the pipeline?

I suspect there is something unusual happened (captain obvious here)

So yes how often is the lookup index created, updated, deleted etc is it constant or is it fairly static.

Why are you force merging it it's a tiny ... How often are you doing that?

Without an the enrich processor does ingest work / scale on multiple nodes?

Are we even sure it's the enrich processor?

I was reading the docs for the enrich processor. It says the enrich index (which is managed by the system) is created using force_merge (https://www.elastic.co/guide/en/elasticsearch/reference/7.9/ingest-enriching-data.html). The docs do not say how often the enrich index is created/updated. The creation date on the index says midnight, and it has been running for 2 weeks. So I would guess daily.

The ingest does work with the enrich processor, but only on a single node.

Hey guys... I am working with Mike on this...

We have a Lambda function executing the enrich policy daily. This keeps the enrich index fresh as the source index is constantly being updated. When I remove the enrich processor, we are able to ingest successfully with multiple nodes (even across AZ's).

This issue is related to a thread I started here: ES Cloud - Running ingest pipelines on warm nodes?

It's going to be easier if we can keep a single thread with all the info. Let's carry on here please :slight_smile:

1 Like