I read this thread - http://elasticsearch-users.115913.n3.nabble.com/reindexing-via-scan-search-type-td3256635.html#a3259255
and have a few more questions.
I'm using pyes(0.16.0) client running against ES server(0.19.x).
The installation I am working on has 3 different indices, the biggest of which has about 50M docs. 3 nodes, each hosting the 3 indices and replicas. i.e. each index replicated to other 2 nodes. 5 shards.
The installation is PROD, serving live read/write traffic. Because business SLA has changed and a number of fields have been deprecated, I would like to 'remove' these fields and 'compact' the indices without changing the underlying mapping. But instead of a new mapping and indexing from scratch, I was hoping to do a 'rolling' reindex of 'dehydrated' docs.
Python code:
batch_size = 5
n = 0
while True:
docs = scan_index(start=n, size=batch_size) # get next batch of docs from index
if not docs:
break
n += len(docs)
new_docs = dehydrate(docs) # create new doc out of old doc sans deprecated fields
reindex(new_docs) # reindex new docs. This involves a delete(), followed by index()
time.sleep(0.5)
From the previous thread, I'm aware of potential issues with delete's but delete's are necessary before reindex can take place.
In testing my 'dehydrate' Python code, I first created test indices of about 50 docs with the old mapping. A run with size=10 would simulate 5 batch reads/scan's and 5 batch reindex's.
Problem(which I kind of anticipated):
a) scan_index() 'misses' some docs. However, multiple runs will eventually catch most if not all docs.
I suspect my 'scan' logic is faulty and would appreciate comments. Please bear with me with (abridged copy) of my Python and pyes client codes:
from pyes import ES
def scan_es(start, size):
connection = ES(['localhost:9200'])
query = {'query':{'match_all':{}}, 'from':start, 'size':size}
resultset = connection.search(query, indexes=['test_index'], scan=True)
for r in resultset['hits']['hits']:
yield r['_source']
def scan_index(start, size):
docs = []
for d in scan_es(start, size):
docs.append(d)
return docs
def reindex(docs):
connection = ES(['localhost:9200'])
for doc in docs:
try:
connection.get('test_index', 'test_doc', doc['id'], fields=['_id'])
connection.delete('test_index', 'test_doc', doc['id'])
connection.index(doc, 'test_index', 'test_doc', doc['id'])
except:
pass
Questions:
- So each batch would invoke a new call to scan_index/scan_es. How does the scroll_id get managed? I looked at github pyes code and the 'scan=True' seem to automagically manage the scroll_id but I didn't dig deep enough to figure out if it gets reset on each search() call.
- There is a pyes search_scroll() API but I have no idea how to manage the scroll_id. Advice pls.
- The rolling reindex is expected to take a few days which is intentional so as not to slam PROD nodes. I can only increase batch_size to maybe 10,000 and continue to be nice to the nodes.
- What would be a more elegant way of achieving this?