Deduplication of records with deletion code


(Jeremy Foran) #1

Hey Folks,

I recently had an issue with an AWS load balancer with resulted in getting millions of duplicated documents.
I found some resources that helped with writing the query to find duplicates (here) but nothing to go through and delete those records. I wanted to share my resulting code here for anyone with the same issue.

Here is the query I wrote:

  "size": 0,
    "aggs": {
      "duplicateCount": {
        "terms": {
          "script": "doc['id'].value + doc['event'].value + doc['@timestamp'].value + doc['status'].value",
          "size": 1000,
          "min_doc_count": 2
        },
        "aggs": {
          "duplicateDocuments": {
            "top_hits": {
                "size" : 10
            }
          }
        }
      }
    }
  }

This will find up 1000 documents that have a least one duplicate and up to 9.

Turned that into a python script which basically runs until the query returns zero results

from elasticsearch import Elasticsearch
from elasticsearch import helpers
import datetime
import time
import sys

url     = "http://localhost"
port    = 9200
index   = "eventlog-2018.08"

es = Elasticsearch([url],  port=port)

query={
"size": 0,
"aggs": {
"duplicateCount": {
  "terms": {
    "script": "doc['id'].value + doc['event'].value + doc['@timestamp'].value + doc['status'].value",
    "size": 1000,
    "min_doc_count": 2
  },
  "aggs": {
    "duplicateDocuments": {
      "top_hits": {
          "size" : 10
      }
    }
  }
}
}
}

clean = False

while not clean:

print(str(datetime.datetime.now()) + ": Running Query")
sys.stdout.flush()

results = es.search(index=index, body=query, request_timeout=120)

delete_records  = open('delete_records.txt', 'a', 10)
skipped_records = open('skipped_records.txt', 'a', 10)

if len(results['aggregations']['duplicateCount']['buckets']) > 0:
    buckets = results['aggregations']['duplicateCount']['buckets']

    ids_to_be_deleted = []

    for bucket in buckets:

        documents = iter(bucket['duplicateDocuments']['hits']['hits'])

        good_record = next(documents)

        skipped_records.write(good_record["_id"] + "\n")

        for document in documents:

            id = document["_id"]
            remove_doc = {
                '_op_type'  : 'delete',
                '_index'    : 'eventlog-2018.09',
                '_type'     : 'eventlog',
                '_id'       : id
            }

            ids_to_be_deleted.append(remove_doc)
            delete_records.write(id + "\n")

    count = str(len(ids_to_be_deleted))
    print("\t\t" + str(datetime.datetime.now()) + ": Query completed# " + count)
    sys.stdout.flush()

    deletes = helpers.parallel_bulk(es,ids_to_be_deleted)

    for item in deletes:
        pass

    print("\t\t" + str(datetime.datetime.now()) + ": Completed")
    sys.stdout.flush()

    #close the records
    delete_records.close()
    skipped_records.close()

    time.sleep(10)

else:
    clean = True

print(str(datetime.datetime.now()) + ": Completed")

I then loaded it onto a cents box and had it run in the background until completed:

nohup python3.6 main.py &

Hope this helps. Also if anyone has any improvements to suggest I'm all ears.


(system) #2

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.