Is pruning done asap?

Hello,

###Teoretical question:
We have 2 aggregations in cascade. The first level aggregations' pruning is independent from it's brethren (it does not depend on the calculation of the aggregations of the same level).

  1. Will Elasticsearch drop the aggregation (and not calculate the leafs) as soon as it's done computing it or will it wait for the completion of all the aggregations before entering the pruning phase?
  2. If so, and we use breadth_first collect mode, will the documents cached by a pruned document remain in memory?

Note: A document can be in at most one bucket at the end of the operation. And only about 5% of the documents analyzed will actually end up in the final buckets(after the pruning). The rest should not occupy the memory.

###Practical example:
We have 2 notions: conversation and message. A message must be linked to exactly one conversation.
Message is an document with the id of the conversation, and the date.
We want to get a date_histogram of the messages grouped by weeks that are attached to a conversation which has more than M messages.

"aggs": {
    "threads": {
      "terms": {
        "field": "conversationId",
        "min_doc_count":  M
      },
      "aggs": {
        "weeksConversations": {
          "date_histogram": {
            "field": "date",
            "interval": "7d",
            "min_doc_count": 1,
          }
        }
      }
    }
  }

We have a huge number of conversations and an even greater number of messages but the min_doc_count is so large it will cut over 95% of the message documents.

  1. Will Elasticsearch do the smart thing here and calculate the total number of messages in a conversation and start the pruning phase if necessary for that node before moving on to the leafs in depth_first mode or the next node in breadth_first?
  2. If a conversation has been analyzed and the total number of messages is not greater than M, will the documents associated with it be discarded from memory before moving on to the next node in breadth_first?
  3. If the pruning conditions are independent and can be calculated on the first level, will Elasticsearch calculate the leafs too in depth_first collection mode?

I'm not fully clear on what your ideal end result will be.

Is it one "messages-over-time" bar chart or several (one per popular conversation)? Your description suggests the former but your agg does the latter.

Is your system sharded or using multiple indices (e.g. time-based)? Do you have any custom routing?
These are relevant questions to determine the cost and accuracy of the aggregation process. You may be forced to do some re-indexing trickery to efficiently calculate what you need here. Joining on high-cardinality fields in a distributed system is not recommended.

I am not trying to optimize a particular case, I am trying to understand how the aggregation algorithm works and when is the pruning done, in both depth_first and breadth_first.

In my example the most efficient way would be to calculate the total messages in a conversation and if their number is lower than M to skip the calculation of the date_histogram and go to the next conversation. Is this what Elasticsearch will do?

I know an even faster way of doing it would be to stop counting the messages in a conversation once it passes M and start counting the leafs but I think that is not possible.

Breadth-first stores up all matching docIDs in each of the root buckets where it is applied. Once collection of the shard-local set of docs matching the query is complete the buckets are pruned and cached doc IDs are then pushed to child aggregations as a form of deferred collection.

Clearly a parent agg cannot prune on the basis of child agg properties (sum of a particular field etc) when it has not flowed any docs down to nested aggs. See https://www.elastic.co/guide/en/elasticsearch/guide/current/_preventing_combinatorial_explosions.html

Note your use of min_doc_count is a global constraint and shard_min_doc_count is what is applied locally to control behaviour of collection on a shard. My comments re high cardinality values and distributed systems are still a consideration here and you need to have an understanding of the distributed aspects of this problem to appreciate the challenges here and what can go wrong.

Thank you for clarifying this out.

The parent agg is pruning on his own property(the messages that have the conversationId). The parent agg is completely independent from the child one.

After some further thinking (and your suggestions about distributed computing) I think I understand what the difference between breadth_first and depth_first is.

In depth_first, because a shard can not know about the bucket status of his peers he is forced to calculate without pruning between aggs levels. After he has completed all the aggs locally he can send his finding to a master which will determine what to discard.

In breadth_first, a shard finishes an agg level and sends the bucket information to the master. The master gathers from all the slaves and decides which are pruned. The slaves receive the information regarding which buckets can be discarded. They discard them and continue to the next level, but this requires to hold the id of the documents associated with each bucket in the meantime.

Generally, trimming of data occurs at shard-level and at the final reducer node.
The shard_size and size properties control the doc volumes at these respective levels.
Depth-first and breadth-first are shard-local evaluation strategies only.

Any analysis on high cardinality fields in a distributed system with related content scattered across shards is going to struggle. You aren't guaranteed to see all of the data around a given entity due to each shard potentially returning only a subset of the overall data about each entity from that shard. Sometimes you need to physically organise data in a way that is better suited to the analysis you are trying to perform which is where the idea of maintaining entity-centric indexes comes into play (in your case the entity might be a conversation with a running count of messages).

What if I use a parent-child association in order to place a conversation and it's associated messages on the same shard.
This will make the shard_min_doc_count and min_doc_count the same thing and allow me to use the shard_min_doc_count to get rid of unwanted buckets from the first level agg.
If I do this, can any of the breadth_first or depth_first strategies be capable of cutting a bucket without calculating the leaf buckets?

Before we go down evaluating that route are you using time-based indices to age out your content?
That would present an issue because conversations could span time periods.

No. All the conversations are in the same index. Sorry for the delayed response, I had to read about you meant :slight_smile: and thanks for your quick responses.

If you are facing a never-ending stream of events it makes sense to have time-based indices to manage this content effectively. When your queries/analytics need to focus on derived attributes that don't exist directly on single documents (e.g. conversations' numMessages or duration or lastMessageDate) and the key that joins these docs is high cardinality (e.g. conversation ID) it can be prohibitively expensive/complex to derive all these attributes at query time. It starts to stretch the limits of what query-time aggregations can achieve. Instead, consider aggregating data in the background by maintaining an entity-centric index for your conversation entities. These can be updated with micro-batch jobs (running every hour/minute/whatever suits) to update "conversation" docs with just the latest messages. See architecture discussion https://www.youtube.com/watch?v=yBf7oeJKH2Y and related example scripts http://bit.ly/entcent

Sorry for the delayed response. First of all great video. Our architecture already has entity-centric indices but we were thinking of using aggs instead(we just migrated from 1.7 to 2.4).

After some discussions we agreed to take your advice and keep them.

Thank you once again for swift and valuable help!

Kind regards,
Mihai

1 Like