Remove duplicates after bulk insert exploiting ES power

Hello,
I have an index of this type:

{
"email": email,
"data": {
	domain: [{
		"purchase_date": date,
		"amount": amount,
	}]
}

And this is my Python method which inserts data into ES:

# @insert into db
def line2json(input_file):
	with open(input_file) as f:
		for line in f:
			line = line.rstrip()
			
			fields = line.split(',')
			email = fields[0]
			purchase_date = fields[1]
			amount = fields[2]

			# 1: check if mail exists
			mailExists = es.exists(index=index_param, doc_type=doctype_param, id=email)

			# if mail does not exists => insert entire doc
			if mailExists is False:
				doc = {
					"email": email,
					"data": {
						domain: [{
							"purchase_date": date,
							"amount": amount
						}]
					}
				}

				res = es.index(index=index_param, doc_type=doctype_param, id=email, body=doc)
			# 2: check if already exists a domain
			else:
				query = es.get(index=index_param, doc_type=doctype_param, id=email)
				# save json content into mydata
				mydata = query['_source']['data']

				# if domain exists => check if 'purchase_date' is the same as the one I'm trying to insert
				if domain in mydata:
					differentPurchaseDate = True
					for element in mydata[domain]:
						if element['purchase_date'] == purchase_date:
							differentPurchaseDate = False
					# if 'purchase_date' does not exists => add it to current domain
					if differentPurchaseDate:
						es.update(index=index_param, doc_type=doctype_param, id=email,
							 body={
								"script": {
							 		"inline":"ctx._source.data['"+domain+"'].add(params.newPurchaseDate)",
							 		"params":{
										"newPurchaseDate": {
											"purchase_date": purchase_date, 
											"amount": amount
										}
								}
							}
						})
				
				# add entire domain
				else:
					es.update(index=index_param, doc_type=doctype_param, id=email,
					 body={
				 		"script": {
					 		"inline":"ctx._source.data['"+domain+"'] = params.newDomain",
					 		"params":{
								"newDomain": [{
									"purchase_date": purchase_date, 
									"amount": amount
								}]
							}
						}
					})

The problem is that if I use this algorithm it takes about 50 seconds for each new inserted line, but I am working with very huge file.
So, I thought: is it possible to reduce import time using a bulk insert for each file and remove duplicates after processing each file?
Thanks!

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.