Hi @Mark_Harwood
I've just found a bug in the ESEntityCentricIndexing.py when building an Entity Centric Index for a colleague. There is an error where the last identifier is not written to the Entity Index.
In the finish up the last batch section, we need to add an action for the last identifiers events before flushing the outstanding actions. I have amended your script below. It's not elegant but you could clean it up. Thought you might want to update the example so others aren't caught out.
Thanks
Simon
from elasticsearch import helpers
from elasticsearch.client import Elasticsearch
import time
import argparse
import json
# This is a generic script to scroll information from one event-centric index sorted
# by entity ID (e.g. a person) and bundle related events in update calls to an entity
# centric index which summarises behaviours over time
parser = argparse.ArgumentParser()
parser.add_argument("eventIndexName", help="The name of the index which will receive events")
parser.add_argument("eventDocType", help="Name of the event type")
parser.add_argument("eventQueryFile", help="The JSON file with the query for events")
parser.add_argument("entityIdField", help="The name of the field with the entity ID")
parser.add_argument("entityIndexName", help="The name of the index which will build entities")
parser.add_argument("entityDocType", help="Name of the entity type")
parser.add_argument("updateScriptFile", help="The script file used to update entities with events")
parser.add_argument("-actionsPerBulk", help="The number of records to send in each bulk request", type=int, default=5000)
parser.add_argument("-eventsPerScrollPage", help="The number of events per scroll page (small=slow)", type=int, default=5000)
parser.add_argument("-maxTimeToProcessScrollPage", help="The max time to process page of events", default="1m")
parser.add_argument("-scriptMode", help="The mode parameter passed to the script", default="fullBuild")
args = parser.parse_args()
json_data=open(args.eventQueryFile).read()
eventsQuery= json.loads(json_data)
# Reads docs from event-centric index and consolidates in entity-centric index
start = time.time()
es = Elasticsearch()
response = es.search(index=args.eventIndexName, doc_type=args.eventDocType,
body=eventsQuery, size=args.eventsPerScrollPage,
scroll=args.maxTimeToProcessScrollPage)
scrollId = response['_scroll_id']
numDocsProcessed = 0
lastEntityId=""
events=[]
actions = []
while (1):
try:
hits = response["hits"]["hits"]
if len(hits) ==0:
break;
for doc in hits:
numDocsProcessed += 1
if numDocsProcessed % 10000 ==0:
elapsedSecs = int(time.time() - start)
dps = numDocsProcessed/elapsedSecs
print numDocsProcessed, "docs per second=",dps
thisEntityId=doc["_source"][args.entityIdField]
if thisEntityId != lastEntityId:
if len(events)>0:
# Package the bundle of content from the same entity to be collapsed
action = {
"_index": args.entityIndexName,
'_op_type': 'update',
"_type": args.entityDocType,
"_id": lastEntityId,
# In future versions of ES >1.4.4
# "script_file": args.updateScriptFile,
# In versions of ES where dynamic scripting is enabled
# you can refer to indexed scripts by ID:
# "script_id": args.updateScriptId,
"script": args.updateScriptFile,
"scripted_upsert":True,
"params": {
"scriptMode": args.scriptMode,
"events":list(events)
},
"upsert" : {
# Use a blank document because script does all the initialization
}
}
actions.append(action)
# Flush bulk indexing action if necessary
if len(actions)>=args.actionsPerBulk:
result=helpers.bulk(es, actions)
# TO check result for failures and take appropriate action
del actions[0:len(actions)]
del events[0:len(events)]
lastEntityId=thisEntityId
events.append(doc["_source"])
response = es.scroll(scroll_id=scrollId, scroll=args.maxTimeToProcessScrollPage)
except:
break
#Tidy up the scroll ID
es.clear_scroll(scroll_id=scrollId)
# Finish up the last batch
if len(events) > 0:
# Package the bundle of content from the same entity to be collapsed
action = {
"_index": args.entityIndexName,
'_op_type': 'update',
"_type": args.entityDocType,
"_id": lastEntityId,
# In future versions of ES >1.4.4
# "script_file": args.updateScriptFile,
# In versions of ES where dynamic scripting is enabled
# you can refer to indexed scripts by ID:
# "script_id": args.updateScriptId,
"script": args.updateScriptFile,
"scripted_upsert":True,
"params": {
"scriptMode": args.scriptMode,
"events":list(events)
},
"upsert" : {
# Use a blank document because script does all the initialization
}
}
actions.append(action)
# Flush bulk indexing action if necessary
if (len(actions) > 0):
helpers.bulk(es, actions)
elapsed = (time.time() - start)
print "processed ", numDocsProcessed, "docs", "time=",elapsed