Reindex with Terms query

Hello,

Everyday I create an index with a size of approximately 10 GB of size. Once the index is completed, I need to reindex a certain set of documents into a new filtered index. To filter the documents I'm using a Terms query with the terms that I want to allow in the new index. I'm currently using around one million terms in my reindex query. From the documentation I read that the maximum number of terms defaults to 65,536, which is considerably lower than the number of terms that I need to use.

As a solution, I'm using the Python API and reindexing the document in "batches". What I'm doing is divide the list of one million terms into batches of 50,000 terms and reindexing my index. To control the number of simultaneous reindexing processes I set the wait_for_completion parameter to True. My script is correctly (as far as I've seen) reindexing the data. However, it is taking too long in the reindexing process. I've read comments of considerably larger indexes being reindexed in a day. Mine has taken about 6 hours in just 13 of the 35 batches.

I think this is because I'm running 35 reindexing processes one after the other. I have two main questions:

  1. Is it possible to approach to this problem in a different manner? Can I reindex directly from Elasticsearch with that amount of terms? Any ideas are very welcomed.

  2. In the case that my current solution is "the only" way to proceed, could you give me some insights on the size of the batches? Is it more efficient to have a lot of batches with not many terms o just a few batches with a lot of terms?

Many thanks in advance,
Best regards,

Hi @tmslara.a

So I am not an expert at optimizing terms queries at the scale you are talking about

But I solved a similar problem with novel approach.

But first, a question does the filtering work on

  1. a Single Term Match : a single term in the incoming document can match any 1 or more in the "terms catalog"
  2. a Small Set of term matches say less than 10 : in the incoming document there are several fields that need to match 1 or more terms in the "Term Catalog"
  3. a Large set of term matches 100s or more etc.

In other words can you describe the matching process a bit further?

In short, I used an enrich processor, put the "catalog of terms" in an index, created and enrich index, used and enrich processor to do the match.. then marked the doc if it matched then dropped the document if it did not... i.e. only kept the documents that matched.

Technically you could do this on ingest of the original doc, but I suspect you want to keep your original documents intact.

Not saying it would work but you asked for alternatives :slight_smile:

1 Like

Thank you for your answer.

Sure. I have a catalog of terms, specifically, IDs, which are strings of (approximatelly) 25-30 characters. The catalog includes around one million of these IDs. I need to reindex a source index using a term query to only include those documents that have the IDs on the catalog. One ID can be associated to more than I document and in general less than 10 docs. If, for example, 4 documents have an ID that is on the ID catalog, I need those 4 documents on my destination index. The origin index has around 150,000,000 docs.

Exactly, I need to maintain the original data. The purpose of reindexing is to focus my analysis just in a subset of documents with certain characteristics.

So

a) you could reindex with the method I described using a pipeline and an enrich processor with a drop processor to drop the documents that do not match...

b) Of course, on ingest you could "enrich" the data as well then just filter when you do analysis, elastic is great at filtering.

Does this make sense?

I assume a) is still desired Then you have the power of reindex with slicing etc.. etc..

Here is a full working stub see if you can follow...

You will need to refer to the docs on enrich and _reindex

The result is a destination index that is enriched with the lookup data and drops docs that do not have a match. (again you could do this on ingest and then just filter to see if the enriched field exists

DELETE discuss-source

PUT discuss-source/
{
  "mappings": {
    "properties": {
      "foo": {
        "type": "keyword"
      },
      "code": {
        "type": "keyword"
      }
    }
  }
}


POST discuss-source/_doc
{
  "foo" : "yoda",
  "code" : "master"
}

POST discuss-source/_doc
{
  "foo" : "luke",
  "code" : "master"
}

POST discuss-source/_doc
{
  "foo" : "han",
  "code" : "ally"
}

POST discuss-source/_doc
{
  "foo" : "mando",
  "code" : "mandalorian"
}

DELETE discuss-term-lookup

PUT discuss-term-lookup/
{
  "mappings": {
    "properties": {
      "code": {
        "type": "keyword"
      },
      "id": {
        "type": "keyword"
      },
      "data": {
        "type": "keyword"
      }
    }
  }
}


POST discuss-term-lookup/_doc
{
  "code" : "master",
  "id" : "1000",
  "data": "friend"
}

POST discuss-term-lookup/_doc
{
  "code" : "master",
  "id" : "1001",
  "data": "legend"
}

POST discuss-term-lookup/_doc
{
  "code" : "ally",
  "id" : "1002",
  "data": "helps the rebellion"
}

PUT /_enrich/policy/discuss-term-lookup
{
  "match": {
    "indices": "discuss-term-lookup",
    "match_field": "code",
    "enrich_fields": ["data", "id"]
  }
}


POST /_enrich/policy/discuss-term-lookup/_execute

PUT _ingest/pipeline/discuss-term-lookup
{
  "processors": [
    {
      "enrich": {
        "policy_name": "discuss-term-lookup",
        "field": "code",
        "target_field": "enriched",
        "max_matches" : 10
      }
    },
    {
      "drop": {
        "if": "ctx?.enriched == null"
        
      }
    }
    
  ]
}

POST _ingest/pipeline/discuss-term-lookup/_simulate
{
  "docs": [
    {
      "_source": {
        "foo": "luke",
        "code": "master"
      }
    },
    {
      "_source": {
        "foo": "han",
        "code": "ally"
      }
    },
    {
      "_source": {
        "foo": "mando",
        "code": "mandalorian"
      }
    }
  ]
}


DELETE discuss-dest

PUT discuss-dest/
{
  "mappings": {
    "properties": {
      "foo": {
        "type": "keyword"
      },
      "code": {
        "type": "keyword"
      },
      "enriched": {
        "properties": {
          "code": {
            "type": "keyword"
          },
          "id": {
            "type": "keyword"
          },
          "data": {
            "type": "keyword"
          }
        }
      }
    }
  }
}


POST _reindex
{
  "source": {
    "index": "discuss-source"
  },
  "dest": {
    "index": "discuss-dest",
    "pipeline": "discuss-term-lookup"
  }
}

GET discuss-dest/_search

I'm not completely sure if I understand the process completely. However, I'm also not sure if it is what I want to do. Let me use the example to explain it better.

I have the following source index.

PUT source_index/
{
  "mappings": {
    "properties": {
      "id": {
        "type": "keyword"
      },
      "name": {
        "type": "keyword"
      }
    }
  }
}


POST source_index/_doc
{
  "id" : "1000",
  "name": "yoda"
}

POST source_index/_doc
{
  "id" : "1001",
  "name": "luke"
}

POST source_index/_doc
{
  "id" : "1001",
  "name": "leia"
}

POST source_index/_doc
{
  "id" : "1002",
  "name": "han"
}

Let's assume that my terms are ["1001", "1002"]. In my destination index I need to have the following documents.

{
  "id" : "1001",
  "name": "luke"
},
{
  "id" : "1001",
  "name": "leia"
},
{
  "id" : "1002",
  "name": "han"
}

Sure that is easy
I am not sure if you are being literal showing the terms as an array.... you would just need to put them in and index...

So your example...

PUT source_index/
{
  "mappings": {
    "properties": {
      "id": {
        "type": "keyword"
      },
      "name": {
        "type": "keyword"
      }
    }
  }
}


POST source_index/_doc
{
  "id" : "1000",
  "name": "yoda"
}

POST source_index/_doc
{
  "id" : "1001",
  "name": "luke"
}

POST source_index/_doc
{
  "id" : "1001",
  "name": "leia"
}

POST source_index/_doc
{
  "id" : "1002",
  "name": "han"
}

DELETE source-term-lookup

PUT source-term-lookup/
{
  "mappings": {
    "properties": {
      "id": {
        "type": "keyword"
      }
    }
  }
}


POST source-term-lookup/_doc
{
  "id" : "1001"
}

POST source-term-lookup/_doc
{
  "id" : "1002"
}

PUT /_enrich/policy/source-term-lookup
{
  "match": {
    "indices": "source-term-lookup",
    "match_field": "id",
    "enrich_fields": ["id"]
  }
}


POST /_enrich/policy/source-term-lookup/_execute


PUT _ingest/pipeline/source-term-lookup
{
  "processors": [
    {
      "enrich": {
        "policy_name": "source-term-lookup",
        "field": "id",
        "target_field": "enriched",
        "max_matches" : 10
      }
    },
    {
      "drop": {
        "if": "ctx?.enriched == null"
        
      }
    },
    {
      "remove": {
        "field": "enriched"
      }
    }
    
  ]
}

POST _ingest/pipeline/source-term-lookup/_simulate
{
  "docs": [
    {
      "_source": {
        "id": "1000",
        "name": "yoda"
      }
    },
    {
      "_source": {
        "id": "1001",
        "name": "leia"
      }
    },
    {
      "_source": {
        "id": "1002",
        "name": "han"
      }
    }
  ]
}

DELETE dest_index

PUT dest_index/
{
  "mappings": {
    "properties": {
      "id": {
        "type": "keyword"
      },
      "name": {
        "type": "keyword"
      }
    }
  }
}


POST _reindex
{
  "source": {
    "index": "source_index"
  },
  "dest": {
    "index": "dest_index",
    "pipeline": "source-term-lookup"
  }
}

GET dest_index/_search

# result

{
  "took": 0,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 3,
      "relation": "eq"
    },
    "max_score": 1,
    "hits": [
      {
        "_index": "dest_index",
        "_id": "N4BH3YcBTO7Ar3aFxmFv",
        "_score": 1,
        "_source": {
          "name": "luke",
          "id": "1001"
        }
      },
      {
        "_index": "dest_index",
        "_id": "OIBH3YcBTO7Ar3aFxmF_",
        "_score": 1,
        "_source": {
          "name": "leia",
          "id": "1001"
        }
      },
      {
        "_index": "dest_index",
        "_id": "OYBH3YcBTO7Ar3aFxmGN",
        "_score": 1,
        "_source": {
          "name": "han",
          "id": "1002"
        }
      }
    ]
  }
}

Thanks! Now I can reindex in a reasonable time (actually, it finished in about an hour). My new index contains about half the number of documents than my original index. However, the storage size is similar. I'm using the same mapping for both indices. I'm not sure if this is something related to the reindexing. Have you any insights about this?

Wow! Good to hear...

New index will probably shrink after some of the background merging goes on.

You can run force merge on it if you'd like and that'll shrink it most likely as well.

This is good to do if the data is not going to change anyways.... This makes queries more efficient

Thanks, I'll try to do it when the reindexing process ends. I launch the same process again and it hasn't finished. I'm not sure if it isn't filtering or my criteria is too general, the number of documents of the reindexed index is getting close to the original index.

Could you give me some reference values based on your previous experience?
The reindex processing didn't finish. Could you give me a reference about the number of documents and the time it took?

What was the error, what does it mean it did not finish, did you run async and did the task finish?

Reindex is on a case by Case basis, depending on the cluster, resources, etc. (CPU, RAM, Disk Space and Type) Is the Cluster Busy...
In general, 150M documents is not gigantic, I know for sure I have done that just don't have number on my hands.
It tends to be linear so if you run it and it gets 1M documents done in 1M then you should be able to estimate a time.
There are many ways to make more efficient
Disable replicas
Did you run asynch / background etc.
Slicing etc
Take a look at the options here

Can you provide

GET _cat/indices/sourc_index?v

Sorry, I should have been more clear. I run the reindex process with the wait_for_completion parameter set to true. I finally had to stop the reindex because it run for, let's say, 3 hours, the number of documents was nearly 32 million of the 50 million of the original index, and it was using twice the storage.

I'll check the link.

Sure, I get

health status index                  uuid         pri rep docs.count docs.deleted store.size pri.store.size
yellow open   source_index 1usrt3iQTQOugp7sD97xAQ   1   1  116368562            0     10.8gb         10.8gb

When I do GET source_index/_count I get about 50 million documents, I think the difference between this number and the number of docs indicated here is due to the mapping. I have a nested field with variable length.

Just to give some additional information, my cluster consists of a unique node.

Set the replicas to 0 on the destination index in the mapping.

Run in the background i.e
wait_for_completion=false
Record the task id etc

So it may take 4 hours
You need to make sure you have disk space an CPU cycles

If you have spare CPU cycles you can use the splicing

Ahh nested will probably take a bit longer

Nothing magic just need time and space.

The run force merge afterwards

If you have a term you can query on you could run successive runs force merge after each

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.