Per second aggregation across documents

Hi Team,

We are using ES as analytics engine for our video platform. Users watch videos and we capture what second of the video is watched in every session. We have a use case where we need to plot a retention graph for each video. In other words, we should be able to show how many users watched a given second of the video. We have an array with 0-not watched and 1- watched for the entire duration of the video per each second. So, 1 hour video will have an array of 3600 size array with 1s and 0s. This is currently stored as a byte array in the index.

Please help us with the following

  1. Are we storing the data in the index in the correct way? If not, what are the other alternatives.
  2. How can we get the data to paint the retention graph. Basically, we should be able to output like
    0-1secs - 100 viewers,
    1-2secs - 98 viewers,
    10-20secs - 80 viewers

Thanks

Naveen,

I think until I discovered a compelling reason not to, I would use a document structure consisting of

  • Unique user ID
  • Unique video ID
  • Integer second

and index these events to indicate the user had played that second of that video.
I choose to have, for example, 3600 distinct documents for a single user playing an entire 2 hour video for a specific reason that I'll explain later.

I might create the index this way:

PUT view_tracking
{
  "settings": {
    "number_of_replicas": 0,
    "number_of_shards": 1
  },
  "mappings": {
    "properties": {
      "user": {
        "type": "keyword"
      },
      "video": {
        "type": "keyword"
      },
      "seconds": {
        "type": "short"
      }
    }
  }
}

(I think short should be big enough for videos up to nine hours.)

Let's look at the scenario where two users have played videos; user A has played one for 10 seconds, user B has played that same one for 3 seconds, and another one for 10 seconds. Here's what bulk indexing that data looks like for our experiment:

POST view_tracking/_bulk
{ "index" : { "_index" : "view_tracking" } }
{ "user":"A","video":"1A","seconds":1}
{ "index" : { "_index" : "view_tracking" } }
{ "user":"A","video":"1A","seconds":2}
{ "index" : { "_index" : "view_tracking" } }
{ "user":"A","video":"1A","seconds":3}
{ "index" : { "_index" : "view_tracking" } }
{ "user":"A","video":"1A","seconds":4}
{ "index" : { "_index" : "view_tracking" } }
{ "user":"A","video":"1A","seconds":5}
{ "index" : { "_index" : "view_tracking" } }
{ "user":"A","video":"1A","seconds":6}
{ "index" : { "_index" : "view_tracking" } }
{ "user":"A","video":"1A","seconds":7}
{ "index" : { "_index" : "view_tracking" } }
{ "user":"A","video":"1A","seconds":8}
{ "index" : { "_index" : "view_tracking" } }
{ "user":"A","video":"1A","seconds":9}
{ "index" : { "_index" : "view_tracking" } }
{ "user":"A","video":"1A","seconds":10}
{ "index" : { "_index" : "view_tracking" } }
{ "user":"B","video":"1A","seconds":1}
{ "index" : { "_index" : "view_tracking" } }
{ "user":"B","video":"1A","seconds":2}
{ "index" : { "_index" : "view_tracking" } }
{ "user":"B","video":"1A","seconds":3}
{ "index" : { "_index" : "view_tracking" } }
{ "user":"B","video":"9G","seconds":1}
{ "index" : { "_index" : "view_tracking" } }
{ "user":"B","video":"9G","seconds":2}
{ "index" : { "_index" : "view_tracking" } }
{ "user":"B","video":"9G","seconds":3}
{ "index" : { "_index" : "view_tracking" } }
{ "user":"B","video":"9G","seconds":4}
{ "index" : { "_index" : "view_tracking" } }
{ "user":"B","video":"9G","seconds":5}
{ "index" : { "_index" : "view_tracking" } }
{ "user":"B","video":"9G","seconds":6}
{ "index" : { "_index" : "view_tracking" } }
{ "user":"B","video":"9G","seconds":7}
{ "index" : { "_index" : "view_tracking" } }
{ "user":"B","video":"9G","seconds":8}
{ "index" : { "_index" : "view_tracking" } }
{ "user":"B","video":"9G","seconds":9}
{ "index" : { "_index" : "view_tracking" } }
{ "user":"B","video":"9G","seconds":10}

So how do we get our retention data?

If we as for retention data for video "1A" at this point, we should get
1-3secs - 2 viewers
4-10secs - 1 viewer
11-30secs - 0 viewers

Because we want count information for several different ranges of a particular field, the natural solution is aggregations.

First, I want to apply a filter that says I'm only interested in video 1A. That will be agg "filter-video-1A". Then I'll break those results into ranges of my seconds field, which will be agg "seconds-ranges", inside the filter agg. The result of that will be the count of all documents in each specified range.

But wait. In that case, for example, within the range bucket from 1 to 3 seconds, we have documents for each of 2 users at 3 times. We're going to have 6 documents instead of just 2, which is the number of unique users who have watched in that range!

To address that issue, I'm going to add an aggregation inside the range aggregation - a cardinality agg. I just tell it, inside each range bucket, count the number of unique values of a specific field: user.

It looks like this:

POST view_tracking/_search?filter_path=aggregations
{
  "size": 0,
  "aggs": {
    "filter-video-1A": {
      "filter": {"term": {"video": "1A"}},
      "aggs": {
        "seconds-ranges": {
          "range": {
            "field": "seconds",
            "ranges": [
              {"from": 1, "to": 3},
              {"from": 4, "to": 10},
              {"from": 11, "to": 30}
            ]
          },
          "aggs": {
            "unique_users": {
              "cardinality": {
                "field": "user"
              }
            }
          }
        }
      }
    }
  }
}

The response looks like:

{
  "aggregations" : {
    "video-1A" : {
      "doc_count" : 13,
      "seconds_ranges" : {
        "buckets" : [
          {
            "key" : "1.0-3.0",
            "from" : 1.0,
            "to" : 3.0,
            "doc_count" : 4,
            "unique_users" : {
              "value" : 2
            }
          },
          {
            "key" : "4.0-10.0",
            "from" : 4.0,
            "to" : 10.0,
            "doc_count" : 6,
            "unique_users" : {
              "value" : 1
            }
          },
          {
            "key" : "11.0-30.0",
            "from" : 11.0,
            "to" : 30.0,
            "doc_count" : 0,
            "unique_users" : {
              "value" : 0
            }
          }
        ]
      }
    }
  }
}

The number you are interested in is unique_users.value.

Now, suppose we had stayed with a document scheme where user_id + video_id is a unique document ID, and seconds is an array with all of the relevant integer values indexed. The aggregation above would have been simpler, right? No need for the cardinality agg.

This is true, but it would have come at a significant operational expense.

I am supposing that updating the records will happen "near real time". That is, as a user is playing a video, each second they play will generate an event that will be sent to the indexing pipeline. (An alternate example model might be that the integer second numbers are batched up and written to the system, say, when the user stops playing for a sufficient time; I am assuming this is not the case here.)

In my model, as a user is playing, every sec a new doc is indexed.
In the other model, every sec, an update is issued, so Elasticsearch fetches an existing doc, applies the update (add another seconds val), indexes a new doc, and marks the previous doc deleted. Then, in the background, the segment merge process reads old segments, looks for docs marked deleted, and writes new segments without the deleted docs (often combining several old segments into a single new one). In this scenario, that's a lot of deleted docs, which tends to have a significant impact on performance.

Oh, and you should have a timestamp field on each doc.

Thank you very much for the complete and detailed explanation Glen_Smith!!

I can see how easy it gets to aggregate with your model. We are planning to keep this data for atleast 1 year for our analytics usecase. Storage wise do you think it is Ok,if we have 50,000 users watching 1 hour video daily?

We also build an aggregated index on daily basis for video from raw view events with attributes like number of users watched the video etc. to reduce the storage and improve the performance. In this aggregated index, we plan to divide the video duration into 20 segments(i.e. 1 hour video will be divided into 3 min segments) and have counts on how many users watched each segment. So, this aggregated index will have 20 attributes corresponding to each segment. Do you see any issue with this approach here?

Thanks again for your time and efforts!

In terms of capacity planning, you're going to need to consider three main things:

  • storage capacity
  • write capacity (indexing)
  • read capacity (searching)

The best thing to do at this stage is some serious testing. You can start on a developer workstation to get your test methods ironed out, but then you'll want to port that to the expected deployment environment/resources. (e.g. the SSD on your laptop is a lot zippier than most network-mounted storage from cloud providers.)

(Definitely investigate Rally!)

For write testing, be sure to test over a sufficient duration, allowing the data nodes to become as full as you envision your production nodes being. You don't want to fall prey to repeatedly testing starting with an empty cluster and thinking you can sustain that indexing rate in perpetuity, because as your indices grow large, both Java memory management and stored segment management (merging) become more dominant concerns of performance.

As what you describe is potentially a pretty large amount of data, I do like your ideas to distill the raw data into more compact form over time, in particular with an eye toward how you need to query the older data. Be sure to have a look at rollup capabilities (requires at least a Basic license).
You should certainly use time-based indices for your initial events. This way, your external process can read one of those indices, apply your compaction strategy writing to a new index, then drop the original index.
Some compression ideas that might be useful at various ages & stages of your data:

  • Merge all docs for a given user + video into a single doc with an array of seconds
  • Reduce resolution to minutes
  • Drop association of data with each user and pivot the data into documents with video_id and counts of users per time interval

If your business demands that you expose this per-second aggregation capacity for the last year of viewing, and your usage is 50k viewers, an hour a day each, I wouldn't be surprised to see a year of data reaching into the tens of terabytes if no effort is spent to condense that data. That is certainly attainable, I've seen lots larger, but it has to be sufficiently responsive to the queries, which requires physical resources. After you do enough testing that you're comfortable presenting a ballpark estimate of your hosting requirements, then the strategies to reduce that come into play (of course including possible compromise on the business requirements).

1 Like

Thank you very much Glen! That was very helpful and I'm glad that your ideas are inline with our thought process.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.