Naveen,
I think until I discovered a compelling reason not to, I would use a document structure consisting of
- Unique user ID
- Unique video ID
- Integer second
and index these events to indicate the user had played that second of that video.
I choose to have, for example, 3600 distinct documents for a single user playing an entire 2 hour video for a specific reason that I'll explain later.
I might create the index this way:
PUT view_tracking
{
"settings": {
"number_of_replicas": 0,
"number_of_shards": 1
},
"mappings": {
"properties": {
"user": {
"type": "keyword"
},
"video": {
"type": "keyword"
},
"seconds": {
"type": "short"
}
}
}
}
(I think short
should be big enough for videos up to nine hours.)
Let's look at the scenario where two users have played videos; user A has played one for 10 seconds, user B has played that same one for 3 seconds, and another one for 10 seconds. Here's what bulk indexing that data looks like for our experiment:
POST view_tracking/_bulk
{ "index" : { "_index" : "view_tracking" } }
{ "user":"A","video":"1A","seconds":1}
{ "index" : { "_index" : "view_tracking" } }
{ "user":"A","video":"1A","seconds":2}
{ "index" : { "_index" : "view_tracking" } }
{ "user":"A","video":"1A","seconds":3}
{ "index" : { "_index" : "view_tracking" } }
{ "user":"A","video":"1A","seconds":4}
{ "index" : { "_index" : "view_tracking" } }
{ "user":"A","video":"1A","seconds":5}
{ "index" : { "_index" : "view_tracking" } }
{ "user":"A","video":"1A","seconds":6}
{ "index" : { "_index" : "view_tracking" } }
{ "user":"A","video":"1A","seconds":7}
{ "index" : { "_index" : "view_tracking" } }
{ "user":"A","video":"1A","seconds":8}
{ "index" : { "_index" : "view_tracking" } }
{ "user":"A","video":"1A","seconds":9}
{ "index" : { "_index" : "view_tracking" } }
{ "user":"A","video":"1A","seconds":10}
{ "index" : { "_index" : "view_tracking" } }
{ "user":"B","video":"1A","seconds":1}
{ "index" : { "_index" : "view_tracking" } }
{ "user":"B","video":"1A","seconds":2}
{ "index" : { "_index" : "view_tracking" } }
{ "user":"B","video":"1A","seconds":3}
{ "index" : { "_index" : "view_tracking" } }
{ "user":"B","video":"9G","seconds":1}
{ "index" : { "_index" : "view_tracking" } }
{ "user":"B","video":"9G","seconds":2}
{ "index" : { "_index" : "view_tracking" } }
{ "user":"B","video":"9G","seconds":3}
{ "index" : { "_index" : "view_tracking" } }
{ "user":"B","video":"9G","seconds":4}
{ "index" : { "_index" : "view_tracking" } }
{ "user":"B","video":"9G","seconds":5}
{ "index" : { "_index" : "view_tracking" } }
{ "user":"B","video":"9G","seconds":6}
{ "index" : { "_index" : "view_tracking" } }
{ "user":"B","video":"9G","seconds":7}
{ "index" : { "_index" : "view_tracking" } }
{ "user":"B","video":"9G","seconds":8}
{ "index" : { "_index" : "view_tracking" } }
{ "user":"B","video":"9G","seconds":9}
{ "index" : { "_index" : "view_tracking" } }
{ "user":"B","video":"9G","seconds":10}
So how do we get our retention data?
If we as for retention data for video "1A" at this point, we should get
1-3secs - 2 viewers
4-10secs - 1 viewer
11-30secs - 0 viewers
Because we want count information for several different ranges of a particular field, the natural solution is aggregations.
First, I want to apply a filter that says I'm only interested in video 1A. That will be agg "filter-video-1A". Then I'll break those results into ranges of my seconds field, which will be agg "seconds-ranges", inside the filter agg. The result of that will be the count of all documents in each specified range.
But wait. In that case, for example, within the range bucket from 1 to 3 seconds, we have documents for each of 2 users at 3 times. We're going to have 6 documents instead of just 2, which is the number of unique users who have watched in that range!
To address that issue, I'm going to add an aggregation inside the range aggregation - a cardinality agg. I just tell it, inside each range bucket, count the number of unique values of a specific field: user
.
It looks like this:
POST view_tracking/_search?filter_path=aggregations
{
"size": 0,
"aggs": {
"filter-video-1A": {
"filter": {"term": {"video": "1A"}},
"aggs": {
"seconds-ranges": {
"range": {
"field": "seconds",
"ranges": [
{"from": 1, "to": 3},
{"from": 4, "to": 10},
{"from": 11, "to": 30}
]
},
"aggs": {
"unique_users": {
"cardinality": {
"field": "user"
}
}
}
}
}
}
}
}
The response looks like:
{
"aggregations" : {
"video-1A" : {
"doc_count" : 13,
"seconds_ranges" : {
"buckets" : [
{
"key" : "1.0-3.0",
"from" : 1.0,
"to" : 3.0,
"doc_count" : 4,
"unique_users" : {
"value" : 2
}
},
{
"key" : "4.0-10.0",
"from" : 4.0,
"to" : 10.0,
"doc_count" : 6,
"unique_users" : {
"value" : 1
}
},
{
"key" : "11.0-30.0",
"from" : 11.0,
"to" : 30.0,
"doc_count" : 0,
"unique_users" : {
"value" : 0
}
}
]
}
}
}
}
The number you are interested in is unique_users.value
.
Now, suppose we had stayed with a document scheme where user_id
+ video_id
is a unique document ID, and seconds
is an array with all of the relevant integer values indexed. The aggregation above would have been simpler, right? No need for the cardinality agg.
This is true, but it would have come at a significant operational expense.
I am supposing that updating the records will happen "near real time". That is, as a user is playing a video, each second they play will generate an event that will be sent to the indexing pipeline. (An alternate example model might be that the integer second numbers are batched up and written to the system, say, when the user stops playing for a sufficient time; I am assuming this is not the case here.)
In my model, as a user is playing, every sec a new doc is indexed.
In the other model, every sec, an update is issued, so Elasticsearch fetches an existing doc, applies the update (add another seconds
val), indexes a new doc, and marks the previous doc deleted. Then, in the background, the segment merge process reads old segments, looks for docs marked deleted, and writes new segments without the deleted docs (often combining several old segments into a single new one). In this scenario, that's a lot of deleted docs, which tends to have a significant impact on performance.