Hey Folks,
I recently had an issue with an AWS load balancer with resulted in getting millions of duplicated documents.
I found some resources that helped with writing the query to find duplicates (here) but nothing to go through and delete those records. I wanted to share my resulting code here for anyone with the same issue.
Here is the query I wrote:
"size": 0,
"aggs": {
"duplicateCount": {
"terms": {
"script": "doc['id'].value + doc['event'].value + doc['@timestamp'].value + doc['status'].value",
"size": 1000,
"min_doc_count": 2
},
"aggs": {
"duplicateDocuments": {
"top_hits": {
"size" : 10
}
}
}
}
}
}
This will find up 1000 documents that have a least one duplicate and up to 9.
Turned that into a python script which basically runs until the query returns zero results
from elasticsearch import Elasticsearch
from elasticsearch import helpers
import datetime
import time
import sys
url = "http://localhost"
port = 9200
index = "eventlog-2018.08"
es = Elasticsearch([url], port=port)
query={
"size": 0,
"aggs": {
"duplicateCount": {
"terms": {
"script": "doc['id'].value + doc['event'].value + doc['@timestamp'].value + doc['status'].value",
"size": 1000,
"min_doc_count": 2
},
"aggs": {
"duplicateDocuments": {
"top_hits": {
"size" : 10
}
}
}
}
}
}
clean = False
while not clean:
print(str(datetime.datetime.now()) + ": Running Query")
sys.stdout.flush()
results = es.search(index=index, body=query, request_timeout=120)
delete_records = open('delete_records.txt', 'a', 10)
skipped_records = open('skipped_records.txt', 'a', 10)
if len(results['aggregations']['duplicateCount']['buckets']) > 0:
buckets = results['aggregations']['duplicateCount']['buckets']
ids_to_be_deleted = []
for bucket in buckets:
documents = iter(bucket['duplicateDocuments']['hits']['hits'])
good_record = next(documents)
skipped_records.write(good_record["_id"] + "\n")
for document in documents:
id = document["_id"]
remove_doc = {
'_op_type' : 'delete',
'_index' : 'eventlog-2018.09',
'_type' : 'eventlog',
'_id' : id
}
ids_to_be_deleted.append(remove_doc)
delete_records.write(id + "\n")
count = str(len(ids_to_be_deleted))
print("\t\t" + str(datetime.datetime.now()) + ": Query completed# " + count)
sys.stdout.flush()
deletes = helpers.parallel_bulk(es,ids_to_be_deleted)
for item in deletes:
pass
print("\t\t" + str(datetime.datetime.now()) + ": Completed")
sys.stdout.flush()
#close the records
delete_records.close()
skipped_records.close()
time.sleep(10)
else:
clean = True
print(str(datetime.datetime.now()) + ": Completed")
I then loaded it onto a cents box and had it run in the background until completed:
nohup python3.6 main.py &
Hope this helps. Also if anyone has any improvements to suggest I'm all ears.