Python script update by query elasticsearch doesn't work

My python script fails at performing the following operation which is what I found in here:
[https://stackoverflow.com/questions/42489340/elastisearch-update-by-query]

def execute_es_update_query(self, i):
    t1 = {
        "script": {
            "inline": "ctx._source.tomato_activity = 'Yes'"
        },
        "query": {
            "range":{
                "tomato_count": {
                    "gte": 0
                }
            }
        }
    }

    t2 = {
        "script": {
            "inline": "ctx._source.tomato_activity = 'No'"
        },
        "query": {
            "match":{
                "tomato_count": 0
            }
        }
    }

    t3 = {
        "script": {
            "inline": "ctx._source.tomato_activity = 'None'"
        },
        "query": {
            "range":{
                "tomato_count": {
                    "lte": 0
                }
            }
        }
    }

    r1 = {
        "script": {
            "inline": "ctx._source.row_activity = 'Yes'"
        },
        "query": {
            "range":{
                "row_captured": {
                    "gte": 1
                }
            }
        }
    }

    r2 = {
        "script": {
            "inline": "ctx._source.row_activity = 'No'"
        },
        "query": {
            "match":{
                "row_captured": 0
            }
        }
    }

    r3 = {
        "script": {
            "inline": "ctx._source.row_activity = 'None'"
        },
        "query": {
            "range":{
                "row_captured": {
                    "lte": 0
                }
            }
        }
    }
    
    queries = list()
    queries = [t1, t2, t3, r1, r2, r3]

    if self.es.indices.exists(index = i):
        for q in queries:
            self.es.update_by_query(index = i, body = q)

Error:

Traceback (most recent call last):
  File "/usr/share/logstash/script/customer_model/run.py", line 19, in <module>
    ecoation.execute_operations()
  File "/usr/share/logstash/script/customer_model/merge.py", line 262, in execute_operations
    self.execute_es_update_query(self.index)
  File "/usr/share/logstash/script/customer_model/merge.py", line 224, in execute_es_update_query
    self.es.update_by_query(index = i, body = q)
  File "/usr/local/lib/python3.6/dist-packages/elasticsearch/client/utils.py", line 84, in _wrapped
    return func(*args, params=params, **kwargs)
  File "/usr/local/lib/python3.6/dist-packages/elasticsearch/client/__init__.py", line 942, in update_by_query
    "POST", _make_path(index, "_update_by_query"), params=params, body=body
  File "/usr/local/lib/python3.6/dist-packages/elasticsearch/transport.py", line 350, in perform_request
    timeout=timeout,
  File "/usr/local/lib/python3.6/dist-packages/elasticsearch/connection/http_urllib3.py", line 252, in perform_request
    self._raise_error(response.status, raw_data)
  File "/usr/local/lib/python3.6/dist-packages/elasticsearch/connection/base.py", line 181, in _raise_error
    status_code, error_message, additional_info
elasticsearch.exceptions.ConflictError: ConflictError(409, '{"took":259,"timed_out":false,"total":132,"updated":0,"deleted":0,"batches":1,"version_conflicts":132,"noops":0,"retries":{"bulk":0,"search":0},"throttled_millis":0,"requests_per_second":-1.0,"throttled_until_millis":0,"failures":[{"index":"api-athena-fruitcount-test-2019.09.12","type":"_doc","id":"dO1lJ20BLWZVc3oBj40F","cause":{"type":"version_conflict_engine_exception","reason":"[dO1lJ20BLWZVc3oBj40F]: version conflict, required seqNo [0], primary term [1]. current document has seqNo [192] and primary term [1]","index_uuid":"EwJeXIC1RyehtGOwKIHArw","shard":"0","index":"api-athena-fruitcount-test-2019.09.12"},"status":409},

Solution:

Reference:

I made this change to my python code and it worked:

I added self.es.indices.refresh(index = i)

if self.es.indices.exists(index = i):
            for q in queries:
                self.es.indices.refresh(index = i)
                self.es.update_by_query(index = i, body = q)

Hi @EZprogramming

Just one information about your solution to refresh the index before each query, you need to know about the cost of a refresh call.



http://blog.mikemccandless.com/ <--- this blog a little bit old but have a lot of resources about how lucene work.

Knowing that and to prevent run unnecessary refresh can be keep the code as before and add a try catch if the exception is a ConflictError so you can run a refresh and call again your update.
something like:

    from elasticsearch.exceptions import ConflictError

    if self.es.indices.exists(index = i):
        for q in queries:
            try:
                self.es.update_by_query(index = i, body = q)
            except ConflictError as e:
                # a conflict exception is raised more info here: https://discuss.elastic.co/t/python-script-update-by-query-elasticsearch-doesnt-work
                self.es.indices.refresh(index = i)
                # try again
                self.es.update_by_query(index = i, body = q)

2 Likes

@gabriel_tessier, thanks for pointing that out. Your approach is more efficient, I agree.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.