Unknown error while bulk indexing

I'm getting some errors during bulk indexing that I cant seem to extract the reason for. Below is my code. Do you see any issues? I feel like this should be pulling out the reason given the explanation in the docs.

async def bulk_index_to_elasticsearch(documents, index):
    try:
        async with await get_elasticsearch_client() as es:
            RETRY_ON_TIMEOUT = 3
            total_successes = 0
            errors = []
            failed_docs = []

            for doc in documents:
                action = {
                    "_op_type": "create",
                    "_index": index,
                    "_id": doc["_id"],  # Using "_id" for Elasticsearch document ID
                    "_source": {key: value for key, value in doc.items() if key != "_id"},  # Include "id" in the source
                    "pipeline": "ml-inference-master"  # Update if you have a different pipeline
                }

                for attempt in range(RETRY_ON_TIMEOUT):
                    try:
                        successes, bulk_errors = await helpers.async_bulk(es, [action], timeout="5m", raise_on_error=False)
                        total_successes += successes
                        if bulk_errors:
                            for error in bulk_errors:
                                error_details = error.get('error', {})
                                error_reason = error_details.get('reason', 'Unknown Error')

                                # Append the error reason to the errors list
                                errors.append(f"Error for document ID {doc['articleUrl']}: {error_reason}")
                                failed_docs.append(doc)  # Add the document to the failed_docs list

                        break  # Break the retry loop if successful or if an error other than timeout occurred
                    except ConnectionTimeout as e:
                        if attempt < RETRY_ON_TIMEOUT - 1:
                            logging.info(f"Timeout for document {doc['articleUrl']}. Retrying {attempt + 1}/{RETRY_ON_TIMEOUT}...")
                            continue
                        else:
                            errors.append(f"Connection timed out for document {doc['articleUrl']} after {RETRY_ON_TIMEOUT} attempts")
                            break
                    except Exception as e:
                        errors.append(f"Exception for document {doc['articleUrl']}: {str(e)}")
                        failed_docs.append(doc)
                        break
            if failed_docs:
                write_errors_to_csv(failed_docs)

        return total_successes, errors
    except Exception as e:
        logging.error(f"An error occurred in bulk_index_to_elasticsearch: {e}")
        return 0, [str(e)]

Hi @mmaccou,

Which version of Elasticsearch and the language client are you using? Can you also share the error you are receiving?

Hi Carly. The version is v8.11.2. There is no error reason or details. My logs show "Unknown Error" which is my default. I cant identify anything strange with the data that is failing so it's kind of a mystery. It's definitely possible my code is wrong? But haven't been able to figure out how.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.