Bulk API hangs forever python cloud function

I am new to Elasticsearch and this issue is driving me crazy. My use case involves getting all documents in elastic-search, min-max normalising some fields and then updating documents in bulk but my bulk call just hangs forever and absolutely no response is returned. What's truly frustrating is that the only pattern to this behaviour is that it happens on a cloud function where I a deploying and it happens near the halfway point. Never happens when locally testing. Please help me, what is even going on?

actions = []
        write_batch_size = 1000

        def process_hit(hit):
            runtime_fields = hit["fields"] if "fields" in hit else {}
            self.__normalize_hit(hit["_source"], min_max_map, runtime_fields)
            action = {
                "_op_type": "update",
                "_index": hit["_index"],
                "_id": hit["_id"],
                "doc": hit["_source"],
                "doc_as_upsert": True,
            }
            actions.append(action)

        def do_bulk_update():
            self.logger.info("Starting bulk update")
            bulk_response = bulk(client=self.es_client, actions=actions, timeout="30s")
            self.logger.info("Bulk responded with %s", bulk_response)

        query = {"match_all": {}}
        script_fields = self._get_search_script_for_run_time_fields()
        self.logger.info("Starting getting docs from ES")
        result = self.utils.safely_call_elasticsearch(
            self.es_client.search,
            index=self.conf["es_index"],
            query=query,
            scroll=self.conf["scroll_time"],
            size=int(self.conf["scroll_size"]),
            source=True,
            source_includes=self.fields_to_normalize,
            script_fields=script_fields,
        )
        scroll_id = result["_scroll_id"]
        total_hits = result["hits"]["total"]["value"]
        self.logger.info("Got total %s documents from ES", total_hits)
        hits = result["hits"]["hits"]
        self.logger.info("Got back %s from ES", len(hits))
        processed_count = 0

        for hit in hits:
            process_hit(hit)

        while len(hits) > 0:
            result = self.utils.safely_call_elasticsearch(
                self.es_client.scroll,
                scroll_id=scroll_id,
                scroll=self.conf["scroll_time"],
            )
            scroll_id = result["_scroll_id"]
            hits = result["hits"]["hits"]
            self.logger.info("Got back %s from ES", len(hits))
            for hit in hits:
                process_hit(hit)
                if len(actions) % write_batch_size == 0:
                    processed_count += write_batch_size
                    do_bulk_update()
                    actions = []
                    self.logger.info(
                        "Processed %s. Total %s. Left %s",
                        processed_count,
                        total_hits,
                        total_hits - processed_count,
                    )

        if actions:
            do_bulk_update()
        self.es_client.clear_scroll(scroll_id=scroll_id)
        self.logger.info("Finished normalizing documents.")

@abdulz

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.