Updating all documents using a dictionary as input

Update All Documents

Background Information

I have a use case where I need to update all documents in my index. My source looks similar to the below:

{
  'hits': [
   {'_index': 'main-index-v2',
    '_type': '_doc',
    '_id': 'DNA_PORTMN0020220108ei180000t',
    '_score': 8.403202,
    '_source': {'id': 'ID_2238309812_hg456',
        'employee_ids': ['J98234', 'J28373', 'CH13561', 'J98823', 'J12294'],
        'non_employee_ids': [],
        'friends_id': ['G8667', 'J98923', 'J28373', 'H82739', 'J98823'],
        'local_date': '2022/01/10',
        'local': True,
    ...
} 

I can easily search my index using the multi_match query, however this is for a single ID.

def create_multi_query(ids: str, fields: list=['employee_ids', 'non_employee_ids', 'friends_id']):
    return {
        "query": {
            "multi_match": {
                "query": f"{ids}",
                "fields": fields,
                "operator": "or"
            }
        }
    }

hits = es.search(index='main-index-v2', body=create_multi_query('G8667'), scroll='2m')

Challenge

I get a daily feed that provides me with a dictionary - the keys are old ids and values are new ids. The resulting data looks similar to a python dictionary or json object.

Example:

{'J1234': 'J2875', 'CH1234': 'J2879'}

The dictionary contains employee_ids, friends_id, and non_employee_ids. The challenge I have is, how can I submit a json object containing the above, to update all documents matching?

My Solution (Thus far)

I have written a painless script to update the ids, however it requires a for loop for each field (that is a list) we want to update. What the script does is loop through each field, one by one. If the current item in the list matches our parameter 'fromId' we append to a list the 'toId', otherwise add the current item to the list and move on. We then set the field equal to the new list.

Painless Script example

def result = [];
for (def item: ctx._source.employee_ids) 
    { 
        if (item == params.fromId) {
        result .add(params.toId)
    } 
    else {
        result .add(item)
    }} ctx._source.employee_ids= result; 

def resultF = [];
for (def item: ctx._source.friends_id) 
    { 
        if (item == params.fromId) {
        resultF .add(params.toId)
    } 
    else {
        resultF .add(item)
    }} ctx._source.friends_id = resultF ; 

This is able to be executed via UpdateByQuery within the elasticsearch_dsl library.

Example of the Update call.


def partial_update(es, items: dict):
    assert es.ping() is True
    tmp = []
    for from_id, to_id in items.items():
        result = execute_intermediate(from_id, to_id)
        tmp.append(result)
    return tmp

@retry((exceptions.ConflictError, exceptions.ConnectionError, exceptions.RequestError), value_type=dict, tries=3, delay=2, backoff=1)
def execute_intermediate(from_id, to_id):
    from elasticsearch_dsl  import UpdateByQuery
    ubq = UpdateByQuery(
        using=auth_es(),
        doc_type='doc', index=settings.ES_WRITE_INDEX,
    )
    ubq = ubq.script(source=UPDATE_SCRIPT, lang='painless', params={'fromId': from_id, 'toId': to_id})
    ubq = ubq.params(wait_for_completion=True)
    res = ubq.execute().to_dict()
    return res

Create an intermediate function to execute the update on the single ID, wrapping with a retry decorator.

Issues

  1. Doing it this way requires me to loop through my dictionary one by one to perform the update.

  2. Not every ID that is contained in the python dictionary exists in my index, in fact only 25% do.

  3. If I want to increase the number of fields we want to update, I need to add a new for loop.

  4. One of the big challenges is that the field in '_source' has a list data structure.

Questions

What is the best / most optimal solution to update all fields in source based on the above?

Is there a way to send a dictionary to find all the documents matching the keys, updating with the values in a single call? Can this be done if the field data structure is a list?

Any advice will be most appreciated.

Hi,

I suppose script update is a good way and as far as I know there is no such out-of-the-box function for this case.

One improvement is to change the array in place. Another is to use map in params.

PUT /test_replace_id/
{
  "mappings": {
    "properties": {
      "employee_ids":{
        "type": "keyword"
      }
    }
  }
}

POST /test_replace_id/_doc/1
{
  "employee_ids": ["old1","old2"],
  "frieds_id": "old1"
}

POST /test_replace_id/_update/1
{
  "script": {
    "source": """
      for (t in params.targets){
        if (ctx._source[t] instanceof List){
          for (int j=0; j<ctx._source[t].length; j++){
            if (params.map.containsKey(ctx._source[t][j])) {
              ctx._source[t][j] = params.map.get(ctx._source[t][j])
            }
          }
        }else{
          if (params.map.containsKey(ctx._source[t])) {
            ctx._source[t] = params.map.get(ctx._source[t])
          }
        }
      }
    """,
    "params":{
      "targets": ["employee_ids","frieds_id"],
      "map": {"old1":"new1"}
    }
  }
}
GET /test_replace_id/_search

1 Like

This is a great improvement.

Adding one modification, including an update date.

Instant Currentdate = Instant.ofEpochMilli(new Date().getTime());
...
ctx._source.updated_datetime = Currentdate;

This question has been answered.

When I do an UBQ, ES sends a Timeout Error, Connection Error, or Conflict Error as some of the documents may contain multiple Ids to be updated. How can we go about sending this update to Tasks Mgmt API?

Set wait_for_completion=false?
Sorry, I'm not familiar with tasks in Elasticsearch..

@Tomo_M

This works well. Thanks

1 Like

For reference:

def update_and_execute(index_name: str, targets: list, results: dict, script: str):
    ubq = UpdateByQuery(using=es, index=index_name, doc_type='doc') \
        .params(wait_for_completion=False, slices=10) \
        .script(source=script, lang='painless', params={'targets': targets , 'map': results})
    res = ubq.execute()
    return res

res = update_and_execute("my-index", TARGETS, MAPPER, UPDATE_SCRIPT)
tasks = elasticsearch.client.TasksClient(es)
tasks.get(res.to_dict()['task'])

Above returns a task ID with 10 slices to perform the update. Each slice contains the below meta-data, with a slice_id. Ids are number 0 - N-1.

{'slice_id': 0,
     'total': 824,
     'updated': 824,
     'created': 0,
     'deleted': 0,
     'batches': 1,
     'version_conflicts': 0,
     'noops': 0,
     'retries': {'bulk': 0, 'search': 0},
     'throttled_millis': 0,
     'requests_per_second': -1.0,
     'throttled_until_millis': 0},

Cheers.

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.