Python bulk index update not indexing all documents

We're bulk indexing documents with this function:

async def bulk_update_index(index: str, docs: dict[str, dict]):
    actions = [
        {
            "_index":   index,
            "_op_type": "index",
            "_id":      doc_id,
            "_source":  json.dumps(doc, cls=ElasticJsonEncoder),
        }
        for doc_id, doc in docs.items()
    ]
    max_tries = 10
    for batched_actions in batched_by_bytes(actions):
        for i in range(max_tries):
            try:
                success, errors = await helpers.async_bulk(client=get_client(), actions=batched_actions)
                if errors:
                    print(f"ERROR\tElastic Bulk - Partial errors on index:\n{errors}")
                break
            except ApiError as api_e:
                print(f"ERROR\tElasticsearch API error while updating index:\n{api_e!r}")
            except Exception as e:
                print(f"ERROR\tException while bulk updating index - tries:{i}/{max_tries}\n{e!r}")
def batched_by_bytes(actions: list, batch_size: int = 50):
    stringified_actions = json.dumps(actions)
    if len(stringified_actions.encode("utf-8")) > MAX_BULK_BYTES_SIZE:
        it = iter(actions)
        while batch := tuple(islice(it, batch_size)):
            yield batch
    else:
        yield actions

The problem arises when we check if all documents have been indexed in Elastic and they're not. Some documents we're indexed but others not. Over time, documents appear after hours or even days.

We're using the function batched_by_bytes in order to make sure that the actions never exceed the 100mb limit of Elastic.

We've tried to use the Ingest node and the result was even worse: searches we're ultra-slow and no documents we're indexed at all. We also tried to use the bulk_indexer refresh to refresh the index in every bulk, but it did not work at all, it only slowed down the index speed.

Is there any way we can make sure that the documents are all indexed correctly and crash if not?

This process is continuous as it's streaming documents every second.

Which version of Elasticsearch are you using?

Does Elasticsearch report any errors in the bulk response? If so, what is reported?

We’re working on version v8.5.1

Using low size batches there are no errors, but speed is not enough to empty the Kafka queue where data is stored waiting to be indexed and no documents are indexed

Using higher size batches, there are some BulkIndexError: “1 document was not indexed” and other API 413 error (Entity too large).

We’re working to optimize batch division as less as possible to increase speed and solve some 413 errors. For the BulkIndexError we don’t have any clue about what can it be, as this is the only error message Elastic returns.

Hi @Mario_Betterplace,

When I got this error in the past, I could resolve it by adding a line to find out where the issue was. This example is from the code I used to create this blog post.

if response[1]:
    for idx, item in enumerate(response[1]):
        if item['index']['status'] != 201:
            print(f"Document with GAME_ID {celtics_current_season.iloc[idx]['GAME_ID']} failed to index.")
            print(f"Error: {item['index']['error']}")

I also found that sometimes, when these errors occur, the issue is because of null values. To see if I have null values in my dataset, I can run the following line of code:

celtics_current_season.isnull().values.any()

If the response I get back is True, I can create a copy of my DataFrame and fill in the null values:

celtics_current_season_copy = celtics_current_season.copy()
celtics_current_season_copy.fillna(0, inplace=True)

The full code from this example can be found here.

Hope this helps!

Hi @jessgarson,

I’ve reviewed the code you proposed, and it doesn’t produce any errors when indexing documents.

This issue occurs when updating documents as well as indexing new ones.

Here’s an example: let’s update a document that is already present in Elastic to see if it updates with a delay. The ES document has the field {'archived': false}, and the new state of the document is {'archived': true}.

  1. We generate the action from the JSON document:

    {
        "_id": "eb98425e-8a92-49e3-bb27-cd8e5178d908", 
        "_index": "homes_eu_es",
         "_op_type": "index",
         "_source": "{'archived': false ..... }" # The document is very big...
    }
    

    The documents we index are quite large, so in this example, the document has been shortened.

  2. We send the document to Elasticsearch using helpers.async_bulk:

    for batched_actions in batched_by_bytes(actions):
                  for i in range(max_tries):
                      try:
                          success, errors = await helpers.async_bulk(client=get_client(), actions=batched_actions)
                          if errors:
                              logger.error(f"\t\tElastic Bulk - Partial errors on index:\n{errors}")
                          else:
                              logger.info(f"\t\tBulk with {len(batched_actions)} families indexed successfully {len(json.dumps(batched_actions).encode("utf-8")) / (1024 * 1024)} MB")
                              break
                      except ApiError as api_e:
                          logger.error(f"\t\tElasticsearch API error while updating index:\n{api_e!r}")
                      except Exception as e:
                          logger.error(f"\t\tException while bulk updating index - tries:{i}/{max_tries}\n{e!r}")
    
  3. We check the indexing results for the document and see that it responds with success = 1 and errors = . We also observe no ApiErrors or any other exceptions caught by the try...except block.

  4. We then search for the document using the ID we had previously, and we see that it hasn’t been updated even after 5 minutes, as it still has the value {'archived': false}. After 1 hour, we can see that in this case, it has been updated, but sometimes it doesn’t update or index until several days later.

How do you "search using the ID"? Are you using the Get API or the Search API? If you use the Get API, then it does not involve any search and you should see the updated values as soon as the bulk operation reports success.

However, if search is involved, you will need a refresh to see updated data. See Near real-time search | Elasticsearch Guide [8.17] | Elastic for more details. The only way that I can think of for a document to not be updated "after several days" is if there were no search at all to that index for days. The first search will not return the correct value, but then that will trigger a refresh, and the next search after that refresh will return the correct value.

You can trigger a refresh using the Elasticsearch Python client using client.indices.refresh(index="eb98425e-8a92-49e3-bb27-cd8e5178d908"). Also note that your code does not raise an error even if errors are reported. Are you sure that you did not get any errors?

1 Like