Elasticsearch always crashes with query aggregations

Hello!

We are integrating ES [2.4.0] with a new application. The application is distributed in an appliance with 32 CPUs and 64Gb of RAM. It collects different types of logs. In a normal installation we have 15Gb of data per day.
The application uses a lot of aggregation queries. This makes ES crash very often. We tried several node configurations.
First, we use 2 nodes with 16GB of heap for each, but it would crash when we tried to launch several queries (for a dashboard) at the same time.
Following other configuration instructions we created a master node and a client node with no data, and two data nodes, but it didn’t solve our problem.
We tried to create more nodes with less heap memory, but always had the same problem.
We changed the circuit braker and limited the field data cache with diferent values of the
indices.fielddata.cache.size and indices.breaker.fielddata.limit, but the node logs started to log lines like this
[2016-09-16 12:11:29,339][WARN ][monitor.jvm ] [node-1] [gc][old][784][203] duration [44.9s], collections [2]/[46.4s], total [44.9s]/[1.1h], memory [7.8gb]->[7.8gb]/[7.8gb], all_pools {[young] [1.4gb]->[1.4gb]/[1.4gb]}{[survivor] [190.8mb]->[191.2mb]/[191.3mb]}{[old] [6.1gb]->[6.1gb]/[6.1gb]}
After serveral lines, the node started to crash, after that it had an OutOfMemory error and the node died and disappeared from the cluster.
What are we doing wrong? What is the best approach for our problem? What can we do to prevent the cluster from becoming red? We need to make the sistem more resilient.

We tried four/three shards and one replica.

Thx!

Can you show us the log where it crashed?

What JVM and OS are you using?
What does the mapping look like?
What are the aggs?
What is the node config?
How much data do you have?
How many indices, shards?

Hello!
What JVM and OS are you using?
We'r using Centos 7
java version "1.8.0_45"
Java(TM) SE Runtime Environment (build 1.8.0_45-b14)
Java HotSpot(TM) 64-Bit Server VM (build 25.45-b02, mixed mode)**

What does the mapping look like?
{
"http_template" : {
"order" : 0,
"template" : "http*",
"settings" : {
"index" : {
"analysis" : {
"analyzer" : {
"string_analyzer" : {
"filter" : [ "lowercase", "asciifolding" ],
"tokenizer" : "keyword"
}
}
},
"number_of_shards" : "4",
"number_of_replicas" : "1"
}
},
"mappings" : {
"default" : {
"dynamic_templates" : [ {
"string_template" : {
"mapping" : {
"analyzer" : "string_analyzer",
"type" : "string"
},
"match_mapping_type" : "string",
"match" : "*"
}
} ],
"_all" : {
"enabled" : false
}
}
},
"aliases" : { }
}
}

What are the aggs?
We have many different aggs, one example is:
{
"size": 0,
"query": {
"filtered": {
"filter": {
"bool": {
"must": [
{
"range": {
"fireEventDate": {
"from": "{{startDate}}",
"to": "{{endDate}}",
"include_lower": true,
"include_upper": false
}
}
}
]
}
}
}
},
"aggregations": {
"host": {
"terms": {
"field": "host",
"size": 0,
"order": {
"_term": "asc"
}
},
"aggregations": {
"ipDst": {
"terms": {
"field": "ipDst",
"size": 0,
"order": {
"_term": "asc"
}
},
"aggregations": {
"country": {
"terms": {
"field": "country",
"size": 0,
"order": {
"_term": "asc"
}
},
"aggregations": {
"min": {
"min": {
"field": "fireEventDate"
}
},
"max": {
"max": {
"field": "fireEventDate"
}
}
}
}
}
}
}
}
}
}

What is the node config?
...
cluster.name: myApp
node.name: node-1
path.data: /var/custom/elastic/data
bootstrap.mlockall: true

http.port: 9201
transport.tcp.port: 9301

discovery.zen.ping.unicast.hosts: ["127.0.0.1:9399"]
discovery.zen.ping.multicast.enabled: false

script.inline: true
script.indexed: true

cluster.routing.allocation.disk.threshold_enabled: True
cluster.routing.allocation.disk.watermark.low: 30gb
cluster.routing.allocation.disk.watermark.high: 20gb

indices.fielddata.cache.size: 75%
indices.breaker.fielddata.limit: 85%

node.master: false
node.data: true
http.enabled: false

We tried too without cache.size and limit or with very low percentage
indices.fielddata.cache.size: 10%
indices.breaker.fielddata.limit: 15%

We tried two data nodes with 16GB the heap or four data nodes with 6GB or 8GB for the heap
We have one dedcated master node with 1GB and one dedicated client node. We tried 1G to 8GB of RAM for the dedicated client

How much data do you have? How many indices, shards?
We have two log types (http and DNS), we create one index per day (with about 80.000.000 hits and 15GB per index, sometimes is less),
but we tried diffferent configurations with similar results:

  • 3 Shards and one replica for index, 1 index per day (about 20GB-30GB per index)
  • 4 Shards and one replica for index, 1 index per day (about 20GB-30GB per index)
  • 5 Shards and one replica for index, 1 index per day (about 20GB-30GB per index)
  • 3 Shards and one replica for index, 4 index per day (about 5GB-6GB per index)
  • 4 Shards and one replica for index, 4 index per day (about 5GB-6GB per index)
  • 4 Shards and one replica for index, 24 index per day (about 1GB per index)

We have other kind of logs with less shards (1 and 1 replica).

In this moment:
{
"cluster_name" : "myApp",
"status" : "green",
"timed_out" : false,
"number_of_nodes" : 6,
"number_of_data_nodes" : 4,
"active_primary_shards" : 1047,
"active_shards" : 2094,
"relocating_shards" : 0,
"initializing_shards" : 0,
"unassigned_shards" : 0,
"delayed_unassigned_shards" : 0,
"number_of_pending_tasks" : 0,
"number_of_in_flight_fetch" : 0,
"task_max_waiting_in_queue_millis" : 0,
"active_shards_percent_as_number" : 100.0
}
This is more or less 20 days, but at least we need four times more retention.
thank you very much!

This is important.

Judging by your agg that is a single request asking for a report on all outbound IP address contacts from all hosts.Have you done any estimate on what the resulting JSON size would be? It looks like it has the potential to be large.
The "cardinality" agg should help in estimating this by telling how many unique IP addresses there are for example. It is worth considering if this makes sense to do as a single request. If the estimated result size is measured in gigabytes maybe you should consider breaking it into multiple requests e.g. one per host.

Thanks! I will try it.

After restarting the cluster and launch the query
[2016-10-10 12:17:32,700][INFO ][node ] [node-1] version[2.4.0], pid[11397], build[ce9f0c7/2016-08-29T09:14:17Z]
[2016-10-10 12:17:32,700][INFO ][node ] [node-1] initializing ...
[2016-10-10 12:17:33,124][INFO ][plugins ] [node-1] modules [reindex, lang-expression, lang-groovy], plugins [delete-by-query], sites []
[2016-10-10 12:17:33,139][INFO ][env ] [node-1] using [1] data paths, mounts [[/elastic1 (/dev/sdb1)]], net usable_space [355.6gb], net total_space [549.4gb], spins? [possibly], types [ext4]
[2016-10-10 12:17:33,139][INFO ][env ] [node-1] heap size [7.8gb], compressed ordinary object pointers [true]
[2016-10-10 12:17:34,644][INFO ][node ] [node-1] initialized
[2016-10-10 12:17:34,644][INFO ][node ] [node-1] starting ...
[2016-10-10 12:17:34,755][INFO ][transport ] [node-1] publish_address {127.0.0.1:9301}, bound_addresses {[::1]:9301}, {127.0.0.1:9301}
[2016-10-10 12:17:34,759][INFO ][discovery ] [node-1] node/4rjtXlo7R12WHG0cRu8MUA
[2016-10-10 12:17:37,993][INFO ][cluster.service ] [node-1] detected_master {node-master}{I6O0cKF5RFuAi9CDqGfhsA}{127.0.0.1}{127.0.0.1:9399}{data=false, master=true}, added {{node-client}{qe5x7NxQR8ujA-rq6juScg}{127.0.0.1}{127.0.0.1:9300}{data=false, master=false},{node-master}{I6O0cKF5RFuAi9CDqGfhsA}{127.0.0.1}{127.0.0.1:9399}{data=false, master=true},}, reason: zen-disco-receive(from master [{node-master}{I6O0cKF5RFuAi9CDqGfhsA}{127.0.0.1}{127.0.0.1:9399}{data=false, master=true}])
[2016-10-10 12:17:38,044][INFO ][node ] [node-1] started
[2016-10-10 12:17:47,104][INFO ][cluster.service ] [node-1] added {{node-4}{GqrTd3qtTWmx7YdGOakPJA}{127.0.0.1}{127.0.0.1:9304}{master=false},{node-3}{a-gCYV7iRKuN15QQbapiww}{127.0.0.1}{127.0.0.1:9303}{master=false},{node-2}{Fc7lgzfbQbazIc1R4ikVEA}{127.0.0.1}{127.0.0.1:9302}{master=false},}, reason: zen-disco-receive(from master [{node-master}{I6O0cKF5RFuAi9CDqGfhsA}{127.0.0.1}{127.0.0.1:9399}{data=false, master=true}])
[2016-10-10 12:32:56,392][WARN ][monitor.jvm ] [node-1] [gc][young][921][16] duration [1s], collections [1]/[1.4s], total [1s]/[1.7s], memory [2.4gb]->[1.9gb]/[7.8gb], all_pools {[young] [1.2gb]->[13.8mb]/[1.4gb]}{[survivor] [7.3mb]->[191.3mb]/[191.3mb]}{[old] [1.2gb]->[1.7gb]/[6.1gb]}
[2016-10-10 12:33:01,743][INFO ][monitor.jvm ] [node-1] [gc][young][926][17] duration [915ms], collections [1]/[1.3s], total [915ms]/[2.7s], memory [3.4gb]->[2.6gb]/[7.8gb], all_pools {[young] [1.4gb]->[19.5mb]/[1.4gb]}{[survivor] [191.3mb]->[191.3mb]/[191.3mb]}{[old] [1.7gb]->[2.4gb]/[6.1gb]}
.... After several lines, the node go away and never comes back!
[2016-10-10 12:39:44,028][INFO ][cluster.service ] [node-1] removed {{node-4}{GqrTd3qtTWmx7YdGOakPJA}{127.0.0.1}{127.0.0.1:9304}{master=false},}, reason: zen-disco-receive(from master [{node-master}{I6O0cKF5RFuAi9CDqGfhsA}{127.0.0.1}{127.0.0.1:9399}{data=false, master=true}])
[2016-10-10 12:40:07,012][WARN ][monitor.jvm ] [node-1] [gc][old][1094][20] duration [22.2s], collections [1]/[22.4s], total [22.2s]/[4.2m], memory [7.7gb]->[7.7gb]/[7.8gb], all_pools {[young] [1.4gb]->[1.4gb]/[1.4gb]}{[survivor] [177.2mb]->[86.6mb]/[191.3mb]}{[old] [6.1gb]->[6.1gb]/[6.1gb]}
More and more... all the nodes starts with the GC and after 10min all nodes fail:
{
"cluster_name" : "myApp",
"status" : "red",
"timed_out" : false,
"number_of_nodes" : 2,
"number_of_data_nodes" : 0,
"active_primary_shards" : 0,
"active_shards" : 0,
"relocating_shards" : 0,
"initializing_shards" : 0,
"unassigned_shards" : 2112,
"delayed_unassigned_shards" : 0,
"number_of_pending_tasks" : 0,
"number_of_in_flight_fetch" : 0,
"task_max_waiting_in_queue_millis" : 0,
"active_shards_percent_as_number" : 0.0
}

If we are indexing data at the same time, the next error is logged :
[2016-09-28 11:45:49,089][WARN ][action.bulk ] [node-1] [http.2016.09.28.00][4] failed to perform indices:data/write/bulk[s][r] on node {node-2}{pxrCGZ0YSpKGO8oKl4-b5A}{
127.0.0.1}{127.0.0.1:9302}{master=false}
RemoteTransportException[[node-2][127.0.0.1:9302][indices:data/write/bulk[s][r]]]; nested: CircuitBreakingException[[parent] Data too large, data for [<transport_request>] would be lar
ger than limit of [5872530227/5.4gb]];
Caused by: CircuitBreakingException[[parent] Data too large, data for [<transport_request>] would be larger than limit of [5872530227/5.4gb]]
at org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService.checkParentLimit(HierarchyCircuitBreakerService.java:247)
at org.elasticsearch.common.breaker.ChildMemoryCircuitBreaker.addEstimateBytesAndMaybeBreak(ChildMemoryCircuitBreaker.java:156)
at org.elasticsearch.transport.netty.MessageChannelHandler.handleRequest(MessageChannelHandler.java:214)
at org.elasticsearch.transport.netty.MessageChannelHandler.messageReceived(MessageChannelHandler.java:116)
at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462)
at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443)
at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303)
at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
at org.elasticsearch.common.netty.OpenChannelsHandler.handleUpstream(OpenChannelsHandler.java:75)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108)
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)

And the last error:
[2016-10-10 12:50:00,021][WARN ][netty.channel.socket.nio.AbstractNioSelector] Unexpected exception in the selector loop.
java.lang.OutOfMemoryError: Java heap space
at java.util.HashMap.resize(Unknown Source)
at java.util.HashMap.putVal(Unknown Source)
at java.util.HashMap.put(Unknown Source)
at java.util.HashSet.add(Unknown Source)
at sun.nio.ch.EPollSelectorImpl.updateSelectedKeys(Unknown Source)
at sun.nio.ch.EPollSelectorImpl.doSelect(Unknown Source)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(Unknown Source)
at sun.nio.ch.SelectorImpl.select(Unknown Source)
at org.jboss.netty.channel.socket.nio.SelectorUtil.select(SelectorUtil.java:68)
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.select(AbstractNioSelector.java:434)
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
[2016-10-10 12:50:00,021][WARN ][netty.channel.socket.nio.AbstractNioSelector] Unexpected exception in the selector loop.
java.lang.OutOfMemoryError: Java heap space
at java.nio.DirectByteBuffer.(Unknown Source)
at java.nio.ByteBuffer.allocateDirect(Unknown Source)
at org.jboss.netty.channel.socket.nio.SocketReceiveBufferAllocator.newBuffer(SocketReceiveBufferAllocator.java:64)
at org.jboss.netty.channel.socket.nio.SocketReceiveBufferAllocator.get(SocketReceiveBufferAllocator.java:41)
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:62)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108)
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)

Hi Mark,
did you see the logs?

Thx!

Looks like memory. Did you do the size estimation I suggested on how big your single-query response would be?

I launched the next query:
{
"from" : 0,
"size" : 0,
"query" : {
"bool" : {
"must" : {
"match_all" : { }
},
"filter" : {
"bool" : {
"must" : {
"range" : {
"fireEventDate" : {
"from" : "2016-10-04T00:00:00.000+02:00",
"to" : "2016-10-05T00:00:00.000+02:00",
"include_lower" : true,
"include_upper" : false
}
}
}
}
}
}
},
"aggregations" : {
"agg" : {
"terms" : {
"field" : "ipSrc",
"size" : 10,
"order" : {
"_count" : "asc"
}
},
"aggregations" : {
"count_agg" : {
"terms" : {
"field" : "host"
}
}
}
}
}
}
This is one day query over one index:
health status index pri rep docs.count docs.deleted store.size pri.store.size
green open http.2016.10.04.00 4 1 39440891 0 13.8gb 6.9gb

I have 1597702 of distinct IP and 86561 of distinct host. I have 4 nodes with 8GB of heap for each one.
After 10 minutes the cluster becames red.
Is the problem the length of the result??, I'm using size" : 10

Thanks again!

Your root agg is the 10 rarest ipSrc values?

(this is concerning for a number of reasons...)

Can we start with a definition of what business problem you are trying to solve?

Hi again!
Of course I can explain our problem.
We have a Java/python app that analyzes network traffic from different sources. Mainly http and dns traffic. We have about 40-50M of hits per day in each source type.
In the past the app worked with postgresql, but they were very slow. I bet very strong for elasticsearch, but we have other big problems now.
We tryed different types of configurations (number of nodes, number of shards and replicas, different limits in the circuit breaker etc..) but we always had the same results. The cluster turned red because all nodes die.
We have a lot of queries that aggregates the data. Our app uses the results of these queries for find several kind of things. We need to aggregate data from one or two months ago, but the reality is we haven't do with one day.
An example is the query above. Also, we need to insert data at the same time we query over they, but as nodes die the data become inconsistent and we need to remove the indices that have unasigned shards.
An example of data is (see mapping above):

{
"creationDate": "2016-09-14T00:46:37.469Z",
"host": "wengen.com",
"bytesReceived": 150,
"bytesTransmited": 50,
"fireEventDate": "2016-09-14T00:46:36.642Z",
"responseTime": 0,
"httpMethod": "GET",
"ipSrc": "10.169.8.213",
"ipDst": "198.111.122.90",
"protocol": "http",
"resultCode": "503",
"uri": "http://wengen.com/hotel/regina",
"tld": "com",
"userAgent": "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36",
"referer": "http://wengen.com",
"mimeType": "text/html",
}

And a complex aggregation could be:

...
"aggregations" : {
"ipSrc": {
"terms" : {
"field" : "ipSrc",
"order" : { "_count" : "asc" }
},
"aggregations" : {
"host": {
"terms" : {
"field" : "host",
"order" : { "_count" : "asc" }
},
"aggregations" : {
"httpMethod" : {
"cardinality" : {
"field" : "httpMethod"
}
},
"postCondition": {
"filter" : { "term": { "httpMethod" : "post" } }
},
"having" : {
"bucket_selector": {
"buckets_path": {
"httpMethod_diff": "httpMethod",
"postCond" : "postCondition._count"
},
"script": "(httpMethod_diff == 1) && (postCond>0)"
}
}
}
}
}
}
}

Sorry for my english.
Thanks!

Thanks for the background.
Let's dig into why the rarest 10 ip addresses would be of interest. I assume that each of the rarest ips occurs only once. Why would that be useful? Are you aware that for a high cardinality field with low frequency (as in your example) the potential for error is unbounded? Let's assume you have 2 shards and they are each asked to return their lowest-frequency IPs. Shard 1 may have a thousand IPs which have only been seen once so arbitrarily returns 10 of these values. Shard 2 does the same. The problem is that an IP in shard 1's results may have occurred a million times on shard 2 so was not returned in shard 2's set of results. The reduced result from both shards therefore includes a highly inaccurate response and a fraction of the total number of IPs that are the "least-frequently seen".

So the question is what risk are you looking to detect with your query?

Hello
with have something similar with our cluster (2.4.1).
Is there a way to prevent loss of a node ?

We have 600 indices and many different queries, trying to optimize each one is just impossible.
But we consider that cluster would protect him and not crash in case of a too expensive query.

I think that many queries (15 queries) arrive at the same time with aggs and fielddata cache needed (link with indices.breaker.request.limit ?) and the master crash...

thanks :=

Why would the master crash under query load? Do you follow our recommendations and have dedicated master nodes?

That sounds a lot - how many nodes?

See Support in the Wild: My Biggest Elasticsearch Problem at Scale | Elastic Blog

Hi Mark!
I think I did not explain well. I don't want rarest ips, Let me try again with another example.
We have the next query (I used the pre tag :smiley: this time) :

{
  "size": 0,
  "query": {
    "filtered": {
      "filter": {
        "bool": {
          "must": [
            {
              "range": {
                "bytesTransmited": { "gt": 1024 }
              }
            },
            {
              "term": { "resultCode": "404" }
            }
          ]
        }
      }
    }
  },
  "aggregations": {
    "uri": {
      "terms": {
        "field": "uri"      
      },
      "aggregations": {
        "ipSrc": {
          "terms": {
            "field": "ipSrc",
            "order": {
              "_term": "asc"
            }
          },
          "aggregations": {
            "bytesTransmited": {
              "terms": {
                "field": "bytesTransmited",
                "order": {
                  "_term": "asc"
                }
              }
            }
          }
        }
      }
    }
  }
}

I want the uri and the ip of the requests that the result code is 404 and we received more than 1024bytes (actually this is a parameter. I just used 1024 value as an example). This may be strange because a 404 response does not usually use many bytes. As you know, many different URIs in a normal navigation can be very high. For us is very important to find this kind of results.
Is this example clear?

Thanks!

OK - your previous example was different.
Your latest example has a high cardinality field at the root (uris that happen to be 404s) and you are trying to find the 10 most popular. I suggest you look at "breadth_first" collection mode. See https://www.elastic.co/guide/en/elasticsearch/guide/2.x/_preventing_combinatorial_explosions.html