When using Elasticsearch's Inference API and Inference Processor to vectorize and write text (using Alibaba's text embedding model, PR:([Inference API] Add Alibaba Cloud AI Search Model support to Inference API #111181), we set the bulk batch size to 100. However, this immediately triggered a rate limit error from the alibaba text_embedding interface. We found that the rate limit for this interface is 50 QPS, and the interface supports passing multiple documents in an array (with a maximum batch size of 32). This means the processing capacity of the interface is 50 * 32 = 1600 documents per second.
However, because we are using the Inference Processor for writing, Elasticsearch creates a separate request for each document, which limits the write rate to 50 documents per second.
I want to know if there are any good methods to address this issue. For example, enabling the Inference Processor to support batch processing of data instead of making individual inference API calls for each document.
import requests
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
# Configuration for Alibaba Cloud AI Search
ALIBABA_HOST = "default-j01.platform-cn-shanghai.opensearch.aliyuncs.com"
API_KEY = "your_api_key_here"
WORKSPACE = "default"
SERVICE_ID = "ops-text-embedding-001"
def batch_inference(documents, batch_size=32):
url = f"https://{ALIBABA_HOST}/inference/{SERVICE_ID}"
headers = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json"
}
batches = [documents[i:i + batch_size] for i in range(0, len(documents), batch_size)]
results = []
for batch in batches:
payload = {
"documents": batch
}
response = requests.post(url, json=payload, headers=headers)
if response.status_code == 200:
results.extend(response.json()['results'])
else:
print(f"Failed to process batch: {response.text}")
# Consider implementing retry logic here based on response status or content
return results
# Assume we have a list of documents to process
documents = [
{"text": "sample text 1"},
{"text": "sample text 2"},
# ... up to 100 documents ...
]
# Perform batch inference
embeddings = batch_inference(documents)
# Connect to Elasticsearch
es = Elasticsearch()
# Prepare documents for bulk indexing with embeddings
actions = []
for i, doc in enumerate(documents):
action = {
"_index": "your_index_name",
"_source": {
"text": doc['text'],
"embedding": embeddings[i]['embedding'] # Assuming 'embedding' is the key for the vector data
}
}
actions.append(action)
# Bulk index the documents
success, _ = bulk(es, actions)
if success:
print(f"Successfully indexed {success} documents.")
else:
print("Failed to index documents.")
OpenSearch/OpenDistro are AWS run products and differ from the original Elasticsearch and Kibana products that Elastic builds and maintains. You may need to contact them directly for further assistance. See What is OpenSearch and the OpenSearch Dashboard? | Elastic for more details.
(This is an automated response from your friendly Elastic bot. Please report this post if you have any suggestions or concerns )
thanks a lot! We have already tried using this approach for batch processing; however, it doesn’t allow us to use pipelines for convenient data writing. We are also looking for better methods, such as ways to optimize the handling of pipelines.
Apache, Apache Lucene, Apache Hadoop, Hadoop, HDFS and the yellow elephant
logo are trademarks of the
Apache Software Foundation
in the United States and/or other countries.