Update All Documents
Background Information
I have a use case where I need to update all documents in my index. My source looks similar to the below:
{
'hits': [
{'_index': 'main-index-v2',
'_type': '_doc',
'_id': 'DNA_PORTMN0020220108ei180000t',
'_score': 8.403202,
'_source': {'id': 'ID_2238309812_hg456',
'employee_ids': ['J98234', 'J28373', 'CH13561', 'J98823', 'J12294'],
'non_employee_ids': [],
'friends_id': ['G8667', 'J98923', 'J28373', 'H82739', 'J98823'],
'local_date': '2022/01/10',
'local': True,
...
}
I can easily search my index using the multi_match query, however this is for a single ID.
def create_multi_query(ids: str, fields: list=['employee_ids', 'non_employee_ids', 'friends_id']):
return {
"query": {
"multi_match": {
"query": f"{ids}",
"fields": fields,
"operator": "or"
}
}
}
hits = es.search(index='main-index-v2', body=create_multi_query('G8667'), scroll='2m')
Challenge
I get a daily feed that provides me with a dictionary - the keys are old ids and values are new ids. The resulting data looks similar to a python dictionary or json object.
Example:
{'J1234': 'J2875', 'CH1234': 'J2879'}
The dictionary contains employee_ids, friends_id, and non_employee_ids. The challenge I have is, how can I submit a json object containing the above, to update all documents matching?
My Solution (Thus far)
I have written a painless script to update the ids, however it requires a for loop for each field (that is a list) we want to update. What the script does is loop through each field, one by one. If the current item in the list matches our parameter 'fromId' we append to a list the 'toId', otherwise add the current item to the list and move on. We then set the field equal to the new list.
Painless Script example
def result = [];
for (def item: ctx._source.employee_ids)
{
if (item == params.fromId) {
result .add(params.toId)
}
else {
result .add(item)
}} ctx._source.employee_ids= result;
def resultF = [];
for (def item: ctx._source.friends_id)
{
if (item == params.fromId) {
resultF .add(params.toId)
}
else {
resultF .add(item)
}} ctx._source.friends_id = resultF ;
This is able to be executed via UpdateByQuery within the elasticsearch_dsl
library.
Example of the Update call.
def partial_update(es, items: dict):
assert es.ping() is True
tmp = []
for from_id, to_id in items.items():
result = execute_intermediate(from_id, to_id)
tmp.append(result)
return tmp
@retry((exceptions.ConflictError, exceptions.ConnectionError, exceptions.RequestError), value_type=dict, tries=3, delay=2, backoff=1)
def execute_intermediate(from_id, to_id):
from elasticsearch_dsl import UpdateByQuery
ubq = UpdateByQuery(
using=auth_es(),
doc_type='doc', index=settings.ES_WRITE_INDEX,
)
ubq = ubq.script(source=UPDATE_SCRIPT, lang='painless', params={'fromId': from_id, 'toId': to_id})
ubq = ubq.params(wait_for_completion=True)
res = ubq.execute().to_dict()
return res
Create an intermediate function to execute the update on the single ID, wrapping with a retry decorator.
Issues
-
Doing it this way requires me to loop through my dictionary one by one to perform the update.
-
Not every ID that is contained in the python dictionary exists in my index, in fact only 25% do.
-
If I want to increase the number of fields we want to update, I need to add a new for loop.
-
One of the big challenges is that the field in '_source' has a list data structure.
Questions
What is the best / most optimal solution to update all fields in source based on the above?
Is there a way to send a dictionary to find all the documents matching the keys, updating with the values in a single call? Can this be done if the field data structure is a list?
Any advice will be most appreciated.