How can I use aggregations to query distinct values across all time grouped by first seen

Hi,

I am trying to query unique uuids across all time grouped by first seen. I can do this in mysql using a nested select like follows.

select firstSeen, count(*) from (
select uuid, date_format(min(createdAt), "%Y-%m-%d") as firstSeen from table group by uuid
) as first group by firstSeen;

This would produce...

2015-01-01     30
2015-01-02     25

I am struggling to replicate this with nested aggregations. Any advice is appreciated.

Thanks

Simon

If you have a lot of data, and especially if the data for each user is sharded on different machines, this is an expensive query for any system.
It is fundamentally a question about users not log records and an entity-centric index may be a more appropriate approach: https://www.elastic.co/videos/entity-centric-indexing-mark-harwood

1 Like

Forgive me if I am misreading your SQL query, and feel free to correct me on it but, as I am reading it, you want to find out how many unique uuids you have for each createdAt date?

If thats correct then the following should give you what you need:

{
  "aggs": {
    "createdAt_histogram": {
      "date_histogram": {
        "field": "createdAt",
        "interval": "day"
      },
      "aggs": {
        "count_unique_uuids": {
          "cardinality": {
            "field": "uuid"
          }
        }
      }
    }
  }
}

hope that helps

Hi Colings,

Almost, I have written that query as well. What I want is all unique uuids across all time grouped by createdAt date, not unique uuids by date. Does that make sense? If I see uuid 1 yesterday and uuid 1 today I should only count it once yesterday as that was the first time it was seen. The fact that it was seen today doesn't matter to me anymore as I have already counted it.

Mark, thanks for your reply also, I will take a look at entity centric indexing.

Thanks

Simon

Ah, I see. I misunderstood what you were doing. I see it now. So I think the only way to do this would be to use the approach @Mark_Harwood suggests. You'll see in that video the reasons why this is hard on large datasets in a traditional event-centric index.

1 Like

@colings86 @Mark_Harwood

I just wanted to follow up on my original email. I am now using entity centric indexes on my data to answer my query, which I am recreating on a regular interval. For all intents in purposes this feels like a map/reduce approach to building an index which is cool.

I have two questions I would like to ask, there is no urgency as with my current data volumes it is not an issue but in the future I anticipate some bottlenecks.

  1. I am currently following the demo approach of tearing down my index and rebuilding it completely. I am given a source record and a bundle of related events by the ESEntityCentricIndex python script which I can reduce down to a single record for the entity. How do I move to an incremental approach, specifically what is the best practise for retrieving the existing entity record from the index to update?

  2. I would really like to parallelise this update. Is there a method for pooling all the records for a given entity at a particular node for processing. At the moment, everything is being pulled through one node and my index exists in only one shard.

Finally, as I am tearing down and rebuilding my index I have successfully used aliases to avoid any outage. I create a new index, point the alias to the new index and delete the old index. This pattern seems to work - alias reallocation seems to be atomic. Is this the best approach. If I move to incremental updates this won't be an issue as I will not be destroying anything.

Thanks again for your help.

Simon

1 Like

The main difference with incremental is that you would include a date-based filter in your query to the event store to "get the latest events since X" where X is the last time you ran the job.

If you want to parallelise the "pull" of events you could run multiple instances of the python script, each running a different query e.g. the equivalent of this pseudo-SQL:

select * from events where entity_id.hashCode()%numWorkers =1 and date >lastRunDate order by entity_id, date

"NumWorkers" is the number of python jobs and each would have a different number (1,2,3...). This way they would be ensured to be pulling different subsets of events.
It may also be possible to use the routing part of a query request to target a worker's query at a specific shard but I haven't tried that.

One more point.
If you are only interested in maintaining the firstSeen date and your logs all arrive in chronological order then you could dispense with the Groovy script in my example and just try insert into your entity-centric index with the op_type=create mode. This would ensure failure if you already have a document for a given UUID.

Cheers
Mark

1 Like

Thanks Mark,

The parallel technique makes sense. This is the same approach Hadoop takes to key distribution to reducers, don't know why I didn't think about it like that. Thanks!

Regarding the incremental approach. I understand how this will give me the records I need to merge in, but where do I get the record to merge against?

Psuedo code

if recordExists
   retrieveCurrentRecord
   iterate over the events and update the existing record
   upsert
else
   create new record
   iterate over the events to populate the entity
   insert
end

The retrieveCurrentRecord bit is what I am confused about. Is this existing record given to me in doc._source.

Thanks

Simon

All taken care of. You are given the current record in doc._source if it exists and also told in ctx.op if it is a create operation or not

1 Like

Brilliant, thanks Mark.

Hi @Mark_Harwood

I've just found a bug in the ESEntityCentricIndexing.py when building an Entity Centric Index for a colleague. There is an error where the last identifier is not written to the Entity Index.

In the finish up the last batch section, we need to add an action for the last identifiers events before flushing the outstanding actions. I have amended your script below. It's not elegant but you could clean it up. Thought you might want to update the example so others aren't caught out.

Thanks

Simon

from elasticsearch import helpers
from elasticsearch.client import Elasticsearch
import time
import argparse
import json

#   This is a generic script to scroll information from one event-centric index sorted 
#   by entity ID (e.g. a person) and bundle related events in update calls to an entity
#   centric index which summarises behaviours over time 

parser = argparse.ArgumentParser()
parser.add_argument("eventIndexName", help="The name of the index which will receive events")
parser.add_argument("eventDocType", help="Name of the event type")
parser.add_argument("eventQueryFile", help="The JSON file with the query for events")
parser.add_argument("entityIdField", help="The name of the field with the entity ID")
parser.add_argument("entityIndexName", help="The name of the index which will build entities")
parser.add_argument("entityDocType", help="Name of the entity type")
parser.add_argument("updateScriptFile", help="The script file used to update entities with events")

parser.add_argument("-actionsPerBulk", help="The number of records to send in each bulk request", type=int, default=5000)
parser.add_argument("-eventsPerScrollPage", help="The number of events per scroll page (small=slow)",  type=int, default=5000)
parser.add_argument("-maxTimeToProcessScrollPage", help="The max time to process page of events",  default="1m")
parser.add_argument("-scriptMode", help="The mode parameter passed to the script", default="fullBuild")
args = parser.parse_args()

json_data=open(args.eventQueryFile).read()
eventsQuery= json.loads(json_data)


# Reads docs from event-centric index and consolidates in entity-centric index
start = time.time()
es = Elasticsearch()
response = es.search(index=args.eventIndexName, doc_type=args.eventDocType,
                     body=eventsQuery,  size=args.eventsPerScrollPage,
                     scroll=args.maxTimeToProcessScrollPage)
scrollId = response['_scroll_id']
numDocsProcessed = 0
lastEntityId=""
events=[]
actions = []
while (1):
    try:
        hits = response["hits"]["hits"]
        if len(hits) ==0:
            break;
        for doc in hits:
            numDocsProcessed += 1
            if numDocsProcessed % 10000 ==0:
                elapsedSecs = int(time.time() - start)
                dps =  numDocsProcessed/elapsedSecs
                print numDocsProcessed, "docs per second=",dps
            thisEntityId=doc["_source"][args.entityIdField]
            if thisEntityId != lastEntityId:
                if len(events)>0:
                    # Package the bundle of content from the same entity to be collapsed
                    action = {
                        "_index": args.entityIndexName,
                        '_op_type': 'update',
                        "_type": args.entityDocType,
                        "_id": lastEntityId,
                         # In future versions of ES >1.4.4
#                        "script_file": args.updateScriptFile,
                         # In versions of ES where dynamic scripting is enabled
                         # you can refer to indexed scripts by ID:
                         # "script_id": args.updateScriptId,
                        "script": args.updateScriptFile,
                        "scripted_upsert":True,
                        "params": {
                            "scriptMode": args.scriptMode,
                            "events":list(events)
                        },
                        "upsert" : {
                           # Use a blank document because script does all the initialization
                        }
                    }
                    actions.append(action)
                    # Flush bulk indexing action if necessary
                    if len(actions)>=args.actionsPerBulk:
                        result=helpers.bulk(es, actions)
                        # TO check result for failures and take appropriate action
                        del actions[0:len(actions)]
                del events[0:len(events)]
                lastEntityId=thisEntityId
            events.append(doc["_source"])
        response = es.scroll(scroll_id=scrollId, scroll=args.maxTimeToProcessScrollPage)
    except:
        break
#Tidy up the scroll ID
es.clear_scroll(scroll_id=scrollId)

# Finish up the last batch
if len(events) > 0:
    
    # Package the bundle of content from the same entity to be collapsed
    action = {
        "_index": args.entityIndexName,
        '_op_type': 'update',
        "_type": args.entityDocType,
        "_id": lastEntityId,
         # In future versions of ES >1.4.4
#        "script_file": args.updateScriptFile,
         # In versions of ES where dynamic scripting is enabled
         # you can refer to indexed scripts by ID:
         # "script_id": args.updateScriptId,
        "script": args.updateScriptFile,
        "scripted_upsert":True,
        "params": {
            "scriptMode": args.scriptMode,
            "events":list(events)
        },
        "upsert" : {
           # Use a blank document because script does all the initialization
        }
    }
    actions.append(action)
    # Flush bulk indexing action if necessary
if (len(actions) > 0):
    helpers.bulk(es, actions)

elapsed = (time.time() - start)
print "processed ", numDocsProcessed, "docs", "time=",elapsed

Good catch!
Honza Král has also been kind enough to tidy up my code and as the author of the elasticsearch Python client he's in a good position to do so.

His comments were as follows:

changes are mostly cosmetic (though I didn't do much there) with the exception of:
switched to using the scan helper to avoid duplicating the  scroll_id handling
I just use one call to the `bulk` helper since it already contains the logic of splitting actions into chunks based on number of items.
I don't build the actions list in memory to pass it to bulk (to conserve memory) but instead have a generator so I am feeding it into bulk as I get it from the scan helper without storing it (the bulk will create the chunks from the stream).
From what I can see your version of the script was also ignoring the last batch of events since events were only sent to ES if entityId differed and not after the loop has finished

revised code:

from elasticsearch import helpers
from elasticsearch.client import Elasticsearch
import time
import argparse
import json

#   This is a generic script to scroll information from one event-centric index sorted 
#   by entity ID (e.g. a person) and bundle related events in update calls to an entity
#   centric index which summarises behaviours over time 

parser = argparse.ArgumentParser()
parser.add_argument("eventIndexName", help="The name of the index which will receive events")
parser.add_argument("eventDocType", help="Name of the event type")
parser.add_argument("eventQueryFile", help="The JSON file with the query for events")
parser.add_argument("entityIdField", help="The name of the field with the entity ID")
parser.add_argument("entityIndexName", help="The name of the index which will build entities")
parser.add_argument("entityDocType", help="Name of the entity type")
parser.add_argument("updateScriptFile", help="The script file used to update entities with events")

parser.add_argument("-actionsPerBulk", help="The number of records to send in each bulk request", type=int, default=5000)
parser.add_argument("-eventsPerScrollPage", help="The number of events per scroll page (small=slow)",  type=int, default=5000)
parser.add_argument("-maxTimeToProcessScrollPage", help="The max time to process page of events",  default="1m")
parser.add_argument("-scriptMode", help="The mode parameter passed to the script", default="fullBuild")
args = parser.parse_args()

eventsQuery = json.load(open(args.eventQueryFile))
es = Elasticsearch()

def generate_actions():
	def get_action(events):
		return {
			'_op_type': 'update',
			"_id": lastEntityId,
			# In future versions of ES >1.4.4
			# "script_file": args.updateScriptFile,
			# In versions of ES where dynamic scripting is enabled
			# you can refer to indexed scripts by ID:
			# "script_id": args.updateScriptId,
			"script": args.updateScriptFile,
			"scripted_upsert":True,
			"params": {
				"scriptMode": args.scriptMode,
				"events":list(events)
			},
			"upsert" : {
			# Use a blank document because script does all the initialization
			}
		}

	lastEntityId = ""
	events = []
	numDocsProcessed = 0
	start = time.time()
	for doc in helpers.scan(es,
			index=args.eventIndexName,
			doc_type=args.eventDocType,
			query=eventsQuery,
			size=args.eventsPerScrollPage,
			scroll=args.maxTimeToProcessScrollPage,
			preserve_order=True):

		thisEntityId = doc["_source"][args.entityIdField]

		# end of event group
		if thisEntityId != lastEntityId:
			if events:
				yield get_action(events)
			events = []
			lastEntityId = thisEntityId
		events.append(doc["_source"])
		numDocsProcessed += 1
		if numDocsProcessed % 10000 == 0:
			elapsedSecs = int(time.time() - start)
			dps =  numDocsProcessed/elapsedSecs
			print numDocsProcessed, "docs per second=",dps

	# load last event group too
	if events:
		yield get_action(events)

	print "Processed", numDocsProcessed, "docs"

start = time.time()
helpers.bulk(es, generate_actions(),
	index=args.entityIndexName,
	doc_type=args.entityDocType,
	chunk_size=args.actionsPerBulk)

elapsed = (time.time() - start)
print "elapsed time=",elapsed

I haven't tried running it yet but Honza's the man.

1 Like

Great. I'll deploy this into our instances. Thanks

@Mark_Harwood Just come across a bug in this script. If data is processed in less than 1 second the script throws a division by zero error. It is an edge case but simple to defend against.

if numDocsProcessed % 10000 == 0:
		elapsedSecs = int(time.time() - start)
                if elapsedSecs == 0:
                           elapsedSecs = 1
		dps =  numDocsProcessed/elapsedSecs

Thanks, Simon.
That one bit me too :slight_smile:
For the record, the updated version with this fix is now on my original demo link at http://bit.ly/entcent

Great. Thanks.