Re-index using a Machine Learning with custom Trained Models

We have currently a Production Elasticsearch using Elastic Cloud and want to experiment vectors using a Machine Learning with a custom trained model.

Machine Learning

  • Configuration: 32 GB RAM | 16.9 vCPU
  • As a pipeline defined to infer a big text (product description)
  • Vector dimension: 768
  • Trained Model:
    • Number of allocations: 18 (Could not go higher)
    • Threads per allocation: 1 (Could not go higher)

Problem:

  • When I start a re-index async in order to fill the new embedded field dense vector, it start the process but shortly hang.

Here what I did:

  • Reduced the batch size to 25 (default 1K) with a "requests_per_second=10", so a wait time between batches.

More information:

  • Re-index like for 3K docs and then hang.
  • The task does not cancel on error.
  • Need to kill the task.
  • Need to re-start the Machine Learning model.
  • I could go bigger machine but not the smallest one and that one is already fairly expensive.

What needed:

  • Need a way to see the Machine Learning logs and understand what going on.
  • How can we re-index without overloading the machine?
  • Trained Model - How much memory is taken for each:
    • Number of allocations?
    • Threads per allocation?

Before we dive into the problem, do you mind sharing a few things:

  • which is you elasticsearch version?
  • when it hangs, can you run this GET _ml/trained_models/<your model>/_stats, it should tell you the failed reason, and share here ?
  • meanwhile, have you try the tip " Set the reindex size option to a value smaller than the queue_capacity for the trained model deployment. Otherwise, requests might be rejected with a "too many requests" 429 error code." from this page?

More information:

  1. Using the latest Elasticsearch version: 8.8.1.

  2. Machine Learning - Trained Model: I tried: (Look like magic number is 18)
    a. Number of allocations: 9 (Before: 18)
    b. Threads per allocation: 2 (Before: 1)
    c. Went well for a short while and then hanged:

    Task Information:
     {
       "completed": false,
       "task": {
     	"node": "jUvcJZ0PR0uytvtvHL8o7g",
     	"id": 12215573,
     	"type": "transport",
     	"action": "indices:data/write/reindex",
     	"status": {
     	  "total": 9394509,
     	  "updated": 0,
     	  "created": 13952,
     	  "deleted": 0,
     	  "batches": 787,
     	  "version_conflicts": 5698,
     	  "noops": 0,
     	  "retries": {
     		"bulk": 0,
     		"search": 0
     	  },
     	  "throttled_millis": 0,
     	  "requests_per_second": -1,
     	  "throttled_until_millis": 0
     	},
     	"description": "reindex from [products_en_ca] to [products_en_ca_bdo2]",
     	"start_time_in_millis": 1687524933896,
     	"running_time_in_nanos": 1393859398089,
     	"cancellable": true,
     	"cancelled": false,
     	"headers": {
     	  "trace.id": "94785796642efca566b9f213fc353ddb"
     	}
       }
     }
    
     Trained Model Stats:
     {
       "count": 1,
       "trained_model_stats": [
     	{
     	  "model_id": "hugging_face_model",
     	  "model_size_stats": {
     		"model_size_bytes": 669494243,
     		"required_native_memory_bytes": 1590646726
     	  },
     	  "pipeline_count": 1,
     	  "ingest": {
     		"total": {
     		  "count": 45906,
     		  "time_in_millis": 2545095017,
     		  "current": 1,
     		  "failed": 0
     		},
     		"pipelines": {
     		  "product-search-vector-bdo": {
     			"count": 45906,
     			"time_in_millis": 2545095017,
     			"current": 1,
     			"failed": 0,
     			"processors": [
     			  {
     				"set": {
     				  "type": "set",
     				  "stats": {
     					"count": 45907,
     					"time_in_millis": 390,
     					"current": 0,
     					"failed": 0
     				  }
     				}
     			  },
     			  {
     				"inference": {
     				  "type": "inference",
     				  "stats": {
     					"count": 45906,
     					"time_in_millis": 2545094486,
     					"current": 1,
     					"failed": 328
     				  }
     				}
     			  }
     			]
     		  }
     		}
     	  },
     	  "inference_stats": {
     		"failure_count": 0,
     		"inference_count": 19674,
     		"cache_miss_count": 0,
     		"missing_all_fields_count": 0,
     		"timestamp": 1687526806543
     	  },
     	  "deployment_stats": {
     		"deployment_id": "hugging_face_model",
     		"model_id": "hugging_face_model",
     		"threads_per_allocation": 2,
     		"number_of_allocations": 9,
     		"queue_capacity": 1024,
     		"state": "started",
     		"allocation_status": {
     		  "allocation_count": 9,
     		  "target_allocation_count": 9,
     		  "state": "fully_allocated"
     		},
     		"cache_size": "638.4mb",
     		"priority": "normal",
     		"start_time": 1687524865801,
     		"inference_count": 19674,
     		"peak_throughput_per_minute": 1929,
     		"nodes": [
     		  {
     			"node": {
     			  "ggo3sgrGS6KeJ86C7BPhqw": {
     				"name": "instance-0000000015",
     				"ephemeral_id": "VPylTl2kQlmU8AttV_jzDA",
     				"transport_address": "172.18.128.46:19193",
     				"external_id": "instance-0000000015",
     				"attributes": {
     				  "ml.allocated_processors": "18",
     				  "availability_zone": "us-east-2a",
     				  "server_name": "instance-0000000015.d0a9eb97ab874c5abfbc8aa70c9b6009",
     				  "logical_availability_zone": "zone-0",
     				  "ml.max_jvm_size": "8589934592",
     				  "region": "us-east-2",
     				  "ml.machine_memory": "34359738368",
     				  "ml.allocated_processors_double": "18.0",
     				  "xpack.installed": "true",
     				  "instance_configuration": "aws.es.ml.c5d"
     				},
     				"roles": [
     				  "ml",
     				  "remote_cluster_client"
     				],
     				"version": "8.8.1"
     			  }
     			},
     			"routing_state": {
     			  "routing_state": "started"
     			},
     			"inference_count": 19674,
     			"average_inference_time_ms": 223.29195893056826,
     			"average_inference_time_ms_excluding_cache_hits": 215.96183706943685,
     			"inference_cache_hit_count": 1384,
     			"last_access": 1687525808124,
     			"number_of_pending_requests": 1,
     			"start_time": 1687524866710,
     			"threads_per_allocation": 2,
     			"number_of_allocations": 9,
     			"peak_throughput_per_minute": 1929,
     			"throughput_last_minute": 0,
     			"inference_cache_hit_count_last_minute": 0
     		  }
     		]
     	  }
     	}
       ]
     }
    

Thanks for the info.

From the model stats api, your model stats is "state": "started", that tells the model is working fine. therefore, most likely you hit the queue_capacity error.

Here are some suggestions you can try:

  1. While reindex, set the a smaller batch size, which I would recommend 50. The benefit of a small size value is that if you have multiple bulk upload through an ingest pipeline using the same model deployment, they all use the same queue. so for your case, the queue will be full quickly if it is using the default reindex batch size (1000)
    The reindex command should be like:

    {
      "source": {
        "index": "products_en_ca",
        "size": 50
      },
      "dest": {
        "index": "products_en_ca_bdo2",
        "pipeline": "product-search-vector-bdo"
      }
    }
    
  2. It looks like you only have 1 ml node. Multiple allocations are better fit to multiple ml nodes. Instead of using "9 allocations x 2 threads", please use "1 allocation x X threads". I would start with "1 allocation x 10 threads" for your case.

  3. Handle pipeline failures in your pipeline configure. Your pipeline configure will look like:

    {
            "on_failure": [
              {
                "set": {
                  "description": "Record error information",
                  "field": "error_information",
                  "value": "Processor {{ _ingest.on_failure_processor_type }} with tag {{ _ingest.on_failure_processor_tag }} in pipeline {{ _ingest.on_failure_pipeline }} failed with message {{ _ingest.on_failure_message }}"
                }
              }
            ],
            "processors": [
              {
                "inference": {
                  "ignore_failure": false,
                  "model_id": "hugging_face_model",
                  ...
                }
              }
            ]
          }
    

    In this way, reindex task wont hang if hitting errors, but it will record the error information to the dest index then continue. After the reindex is done, you can run the below query to see failure details:

    GET products_en_ca_bdo2/_search
    {
      "query": {
        "exists": {
          "field": "error_information"
        }
      }
    }
    

If all the above wont help your case, as you mentioned you are running on our Elastic Cloud, feel free to create a support case, we will take a deep look at your cluster logs.

Hope it helps.

Thanks for your help.

I am currently on Elastic Cloud. Changed the pipeline (error handling) and threading and look better.

Machine Learning

  1. My choices for the threads are: 1 - 2 - 4 - 8 => So I took 8
  2. Allocations: Using 2 is working (but not higher). Should I use 1 as will not improve anything?
  3. Can I downgrade the machine (32 GB RAM | 16.9 vCPU) or risky?

FYI: I created first a case with support but I got better support here! Case: 01384117

I will continue to re-index and will see.

Have a nice day! :slight_smile:

Good day,

  1. My choices for the threads are: 1 - 2 - 4 - 8 => So I took 8
  2. Allocations: Using 2 is working (but not higher). Should I use 1 as will not improve anything?

I just realized I had a typo in my previous reply regarding threads number, sorry.

What I meant to start with is: 1 allocation x 8 threads, then 1 allocation x 16 threads ... this page can help understand the concept of allocations & threads. Since you have one single ml node, more thread should get better performance than more allocations. Because I don't know if there are other ml features or models sharing the same ml node resource, that's why 8 threads were recommended to start with. For using more than 8 threads, you might have to use api or dev tools to send the request.

  1. Can I downgrade the machine (32 GB RAM | 16.9 vCPU) or risky?

There is no risk. The trade-off will be the overall inference/reindex time. Before downgrade, Don't forget to stop your model first. After downgrade is done, you can re-start model with appropriate threads settings.

Feel free to add your questions or results to your existing support case, our support team will team up with us and provide you a best solution.

Cheers.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.