Hello,
I have an index of this type:
{
"email": email,
"data": {
domain: [{
"purchase_date": date,
"amount": amount,
}]
}
And this is my Python method which inserts data into ES:
# @insert into db
def line2json(input_file):
with open(input_file) as f:
for line in f:
line = line.rstrip()
fields = line.split(',')
email = fields[0]
purchase_date = fields[1]
amount = fields[2]
# 1: check if mail exists
mailExists = es.exists(index=index_param, doc_type=doctype_param, id=email)
# if mail does not exists => insert entire doc
if mailExists is False:
doc = {
"email": email,
"data": {
domain: [{
"purchase_date": date,
"amount": amount
}]
}
}
res = es.index(index=index_param, doc_type=doctype_param, id=email, body=doc)
# 2: check if already exists a domain
else:
query = es.get(index=index_param, doc_type=doctype_param, id=email)
# save json content into mydata
mydata = query['_source']['data']
# if domain exists => check if 'purchase_date' is the same as the one I'm trying to insert
if domain in mydata:
differentPurchaseDate = True
for element in mydata[domain]:
if element['purchase_date'] == purchase_date:
differentPurchaseDate = False
# if 'purchase_date' does not exists => add it to current domain
if differentPurchaseDate:
es.update(index=index_param, doc_type=doctype_param, id=email,
body={
"script": {
"inline":"ctx._source.data['"+domain+"'].add(params.newPurchaseDate)",
"params":{
"newPurchaseDate": {
"purchase_date": purchase_date,
"amount": amount
}
}
}
})
# add entire domain
else:
es.update(index=index_param, doc_type=doctype_param, id=email,
body={
"script": {
"inline":"ctx._source.data['"+domain+"'] = params.newDomain",
"params":{
"newDomain": [{
"purchase_date": purchase_date,
"amount": amount
}]
}
}
})
The problem is that if I use this algorithm it takes about 50 seconds for each new inserted line, but I am working with very huge file.
So, I thought: is it possible to reduce import time using a bulk insert for each file and remove duplicates after processing each file?
Thanks!