I'm getting some errors during bulk indexing that I cant seem to extract the reason for. Below is my code. Do you see any issues? I feel like this should be pulling out the reason given the explanation in the docs.
async def bulk_index_to_elasticsearch(documents, index):
try:
async with await get_elasticsearch_client() as es:
RETRY_ON_TIMEOUT = 3
total_successes = 0
errors = []
failed_docs = []
for doc in documents:
action = {
"_op_type": "create",
"_index": index,
"_id": doc["_id"], # Using "_id" for Elasticsearch document ID
"_source": {key: value for key, value in doc.items() if key != "_id"}, # Include "id" in the source
"pipeline": "ml-inference-master" # Update if you have a different pipeline
}
for attempt in range(RETRY_ON_TIMEOUT):
try:
successes, bulk_errors = await helpers.async_bulk(es, [action], timeout="5m", raise_on_error=False)
total_successes += successes
if bulk_errors:
for error in bulk_errors:
error_details = error.get('error', {})
error_reason = error_details.get('reason', 'Unknown Error')
# Append the error reason to the errors list
errors.append(f"Error for document ID {doc['articleUrl']}: {error_reason}")
failed_docs.append(doc) # Add the document to the failed_docs list
break # Break the retry loop if successful or if an error other than timeout occurred
except ConnectionTimeout as e:
if attempt < RETRY_ON_TIMEOUT - 1:
logging.info(f"Timeout for document {doc['articleUrl']}. Retrying {attempt + 1}/{RETRY_ON_TIMEOUT}...")
continue
else:
errors.append(f"Connection timed out for document {doc['articleUrl']} after {RETRY_ON_TIMEOUT} attempts")
break
except Exception as e:
errors.append(f"Exception for document {doc['articleUrl']}: {str(e)}")
failed_docs.append(doc)
break
if failed_docs:
write_errors_to_csv(failed_docs)
return total_successes, errors
except Exception as e:
logging.error(f"An error occurred in bulk_index_to_elasticsearch: {e}")
return 0, [str(e)]