Deduplication of records with deletion code

Hey Folks,

I recently had an issue with an AWS load balancer with resulted in getting millions of duplicated documents.
I found some resources that helped with writing the query to find duplicates (here) but nothing to go through and delete those records. I wanted to share my resulting code here for anyone with the same issue.

Here is the query I wrote:

  "size": 0,
    "aggs": {
      "duplicateCount": {
        "terms": {
          "script": "doc['id'].value + doc['event'].value + doc['@timestamp'].value + doc['status'].value",
          "size": 1000,
          "min_doc_count": 2
        },
        "aggs": {
          "duplicateDocuments": {
            "top_hits": {
                "size" : 10
            }
          }
        }
      }
    }
  }

This will find up 1000 documents that have a least one duplicate and up to 9.

Turned that into a python script which basically runs until the query returns zero results

from elasticsearch import Elasticsearch
from elasticsearch import helpers
import datetime
import time
import sys

url     = "http://localhost"
port    = 9200
index   = "eventlog-2018.08"

es = Elasticsearch([url],  port=port)

query={
"size": 0,
"aggs": {
"duplicateCount": {
  "terms": {
    "script": "doc['id'].value + doc['event'].value + doc['@timestamp'].value + doc['status'].value",
    "size": 1000,
    "min_doc_count": 2
  },
  "aggs": {
    "duplicateDocuments": {
      "top_hits": {
          "size" : 10
      }
    }
  }
}
}
}

clean = False

while not clean:

print(str(datetime.datetime.now()) + ": Running Query")
sys.stdout.flush()

results = es.search(index=index, body=query, request_timeout=120)

delete_records  = open('delete_records.txt', 'a', 10)
skipped_records = open('skipped_records.txt', 'a', 10)

if len(results['aggregations']['duplicateCount']['buckets']) > 0:
    buckets = results['aggregations']['duplicateCount']['buckets']

    ids_to_be_deleted = []

    for bucket in buckets:

        documents = iter(bucket['duplicateDocuments']['hits']['hits'])

        good_record = next(documents)

        skipped_records.write(good_record["_id"] + "\n")

        for document in documents:

            id = document["_id"]
            remove_doc = {
                '_op_type'  : 'delete',
                '_index'    : 'eventlog-2018.09',
                '_type'     : 'eventlog',
                '_id'       : id
            }

            ids_to_be_deleted.append(remove_doc)
            delete_records.write(id + "\n")

    count = str(len(ids_to_be_deleted))
    print("\t\t" + str(datetime.datetime.now()) + ": Query completed# " + count)
    sys.stdout.flush()

    deletes = helpers.parallel_bulk(es,ids_to_be_deleted)

    for item in deletes:
        pass

    print("\t\t" + str(datetime.datetime.now()) + ": Completed")
    sys.stdout.flush()

    #close the records
    delete_records.close()
    skipped_records.close()

    time.sleep(10)

else:
    clean = True

print(str(datetime.datetime.now()) + ": Completed")

I then loaded it onto a cents box and had it run in the background until completed:

nohup python3.6 main.py &

Hope this helps. Also if anyone has any improvements to suggest I'm all ears.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.