Python bulk index update not indexing all documents

We're bulk indexing documents with this function:

async def bulk_update_index(index: str, docs: dict[str, dict]):
    actions = [
        {
            "_index":   index,
            "_op_type": "index",
            "_id":      doc_id,
            "_source":  json.dumps(doc, cls=ElasticJsonEncoder),
        }
        for doc_id, doc in docs.items()
    ]
    max_tries = 10
    for batched_actions in batched_by_bytes(actions):
        for i in range(max_tries):
            try:
                success, errors = await helpers.async_bulk(client=get_client(), actions=batched_actions)
                if errors:
                    print(f"ERROR\tElastic Bulk - Partial errors on index:\n{errors}")
                break
            except ApiError as api_e:
                print(f"ERROR\tElasticsearch API error while updating index:\n{api_e!r}")
            except Exception as e:
                print(f"ERROR\tException while bulk updating index - tries:{i}/{max_tries}\n{e!r}")
def batched_by_bytes(actions: list, batch_size: int = 50):
    stringified_actions = json.dumps(actions)
    if len(stringified_actions.encode("utf-8")) > MAX_BULK_BYTES_SIZE:
        it = iter(actions)
        while batch := tuple(islice(it, batch_size)):
            yield batch
    else:
        yield actions

The problem arises when we check if all documents have been indexed in Elastic and they're not. Some documents we're indexed but others not. Over time, documents appear after hours or even days.

We're using the function batched_by_bytes in order to make sure that the actions never exceed the 100mb limit of Elastic.

We've tried to use the Ingest node and the result was even worse: searches we're ultra-slow and no documents we're indexed at all. We also tried to use the bulk_indexer refresh to refresh the index in every bulk, but it did not work at all, it only slowed down the index speed.

Is there any way we can make sure that the documents are all indexed correctly and crash if not?

This process is continuous as it's streaming documents every second.

Which version of Elasticsearch are you using?

Does Elasticsearch report any errors in the bulk response? If so, what is reported?

We’re working on version v8.5.1

Using low size batches there are no errors, but speed is not enough to empty the Kafka queue where data is stored waiting to be indexed and no documents are indexed

Using higher size batches, there are some BulkIndexError: “1 document was not indexed” and other API 413 error (Entity too large).

We’re working to optimize batch division as less as possible to increase speed and solve some 413 errors. For the BulkIndexError we don’t have any clue about what can it be, as this is the only error message Elastic returns.

Hi @Mario_Betterplace,

When I got this error in the past, I could resolve it by adding a line to find out where the issue was. This example is from the code I used to create this blog post.

if response[1]:
    for idx, item in enumerate(response[1]):
        if item['index']['status'] != 201:
            print(f"Document with GAME_ID {celtics_current_season.iloc[idx]['GAME_ID']} failed to index.")
            print(f"Error: {item['index']['error']}")

I also found that sometimes, when these errors occur, the issue is because of null values. To see if I have null values in my dataset, I can run the following line of code:

celtics_current_season.isnull().values.any()

If the response I get back is True, I can create a copy of my DataFrame and fill in the null values:

celtics_current_season_copy = celtics_current_season.copy()
celtics_current_season_copy.fillna(0, inplace=True)

The full code from this example can be found here.

Hope this helps!