Individually update a large amount of documents with the Python DSL

Hi! I'm trying to use the UpdateByQuery to update a property of a large amount of documents. I got the new values by processing them with python, and now I need to update their values in the indexes. As each document has a different value, I need to execute the update one by one. I'm traversing a big amount of documents, and for each document I call this funcion:

def update_references(self, query, script_source):

    try:
        ubq = UpdateByQuery(using=self.client, index=self.index).update_from_dict(query).script(source=script_source)
        ubq.execute()

    except Exception as err:
        return False

    return True

Some example values are:

query = {'query': {'match': {'_id': 'VpKI1msBNuDimFsyxxm4'}}}
script_source = 'ctx._source.refs = ['python', 'java']'

The problem is that when I do that, I got an error: "Too many dynamic script compilations within, max: [75/5m]; please use indexed, or scripts with parameters instead; this limit can be changed by the [script.max_compilations_rate] setting".

If I change the max_compilations_rate using Kibana, it has no effect:

PUT _cluster/settings
{
"transient": {
"script.max_compilations_rate": "1500/1m"
}
}
Anyway, I don't want to depend on the performance ofthe computer, so it would be better to use a parametrized script. I tried:

def update_references(self, query, script_source, script_params):

try:
    ubq = UpdateByQuery(using=self.client, index=self.index).update_from_dict(query).script(source=script_source, params=script_params)
    ubq.execute()

except Exception as err:
    return False

return True

So, this time:

script_source = 'ctx._source.refs = params.value'
script_params = {'value': ['python', 'java']}

But as I have to update the query and the parameters each time, I need to create a new instance of the UpdateByQuery for each document in the large collection, and the result is the same error.

I also tried to traverse and update the large collection with:

es.update(
index=kwargs["index"],
doc_type="paper",
id=paper["_id"],
body={"doc": {
"refs": paper["refs"] # e.g. [\'python\', \'java\']
}}
)

But I'm getting the following error: "Failed to establish a new connection: [Errno 99] Cannot assign requested address juil. 10 18:07:14 bib gunicorn[20891]: POST http://localhost:9200/papers/paper/OZKI1msBNuDimFsy0SM9/_update [status:N/A request:0.005s"

Then, I tried an update by script to update a large amount of documents at the same time:

q = {
"script": {
"inline": script_code,
"lang": "painless"
},
"query": {"match_all": {}}
}

es.update_by_query(body=q, doc_type='paper', index=self.index, params={ "docs": papers })

And this time I got: Error: RequestError(400, 'too_long_frame_exception', 'An HTTP line is larger than 4096 bytes.')

So, please, if you have any idea on how to solve this problem and update my fields will be really appreciated.
Best,

Hi @gbosetti,

It seems the update_by_query API is not the best approach for this use case. It was designed to update all documents that match a query using the provided script. But in your example, the query selects only 1 document as you inform its id.

query = {'query': {'match': {'_id': 'VpKI1msBNuDimFsyxxm4'}}}

In order to update documents providing its id, you should use the _update API. Following is python code that you can use as a reference to perform the update of all documents in an index. The steps are:

  1. Delete the sample index my_test_index, if exists;
  2. Index 500 documents using the _bulk API;
  3. Iterate over all 500 documents and update them using the _update API. The script declares a random multiplier to ensure the script is being compiled once.
# -*- coding: utf-8 -*-

import random
from elasticsearch import Elasticsearch, helpers

index_name = 'my_test_index'
doc_type = '_doc'

es = Elasticsearch(
    http_auth=('elastic', 'PASSWORD')
)

print(es.indices.delete(index=index_name, ignore=[400, 404]))

actions = []
for i in range(500):
    actions.append({
        '_index': index_name,
        '_type': doc_type,
        '_source': {
            'project': 'project {}'.format(i),
            'my_field': 2
        }
    })

# Create 500 documents
helpers.bulk(es, actions, refresh=True)


for document in helpers.scan(es, index=index_name):
    es.update(index=document['_index'], doc_type=document['_type'], id=document['_id'], body={
        "script" : {
            "source": "ctx._source.my_field += ctx._source.my_field * params.count",
            "lang": "painless",
            "params" : {
                "count" : random.randint(1, 100000)
            }
        }
    })

As a result, all documents have the field my_field updated:

GET test/_search

{
  "took" : 0,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 1000,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "test",
        "_type" : "_doc",
        "_id" : "e2XMD2wBmjmwltfkszzP",
        "_score" : 1.0,
        "_source" : {
          "project" : "project 0",
          "my_field" : 155636
        }
      },
      ...
   ]
}

I hope this helps!

Cheers,
LG

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.