We're bulk indexing documents with this function:
async def bulk_update_index(index: str, docs: dict[str, dict]):
actions = [
{
"_index": index,
"_op_type": "index",
"_id": doc_id,
"_source": json.dumps(doc, cls=ElasticJsonEncoder),
}
for doc_id, doc in docs.items()
]
max_tries = 10
for batched_actions in batched_by_bytes(actions):
for i in range(max_tries):
try:
success, errors = await helpers.async_bulk(client=get_client(), actions=batched_actions)
if errors:
print(f"ERROR\tElastic Bulk - Partial errors on index:\n{errors}")
break
except ApiError as api_e:
print(f"ERROR\tElasticsearch API error while updating index:\n{api_e!r}")
except Exception as e:
print(f"ERROR\tException while bulk updating index - tries:{i}/{max_tries}\n{e!r}")
def batched_by_bytes(actions: list, batch_size: int = 50):
stringified_actions = json.dumps(actions)
if len(stringified_actions.encode("utf-8")) > MAX_BULK_BYTES_SIZE:
it = iter(actions)
while batch := tuple(islice(it, batch_size)):
yield batch
else:
yield actions
The problem arises when we check if all documents have been indexed in Elastic and they're not. Some documents we're indexed but others not. Over time, documents appear after hours or even days.
We're using the function batched_by_bytes
in order to make sure that the actions never exceed the 100mb limit of Elastic.
We've tried to use the Ingest node and the result was even worse: searches we're ultra-slow and no documents we're indexed at all. We also tried to use the bulk_indexer refresh to refresh the index in every bulk, but it did not work at all, it only slowed down the index speed.
Is there any way we can make sure that the documents are all indexed correctly and crash if not?
This process is continuous as it's streaming documents every second.