Performance aggregations vs collapsing


(Jnioche) #1

Hi,

I'm using aggregations over individual shards - these are large (40M docs per shard) but made of small documents. The aggregations are pretty simple

GET status/_search?preference=_shards:5

{
"from": 0,
"size": 0,
"query": {
"range": {
"nextFetchDate": {
"from": null,
"to": "2018-10-10T12:32:33+01:00",
"include_lower": true,
"include_upper": true,
"boost": 1.0
}
}
},
"explain": false,
"track_total_hits": false,
"aggregations": {
"sample": {
"diversified_sampler": {
"field": "hostname",
"shard_size": 5000,
"max_docs_per_value": 2
},
"aggregations": {
"partition": {
"terms": {
"field": "hostname",
"size": 2500,
"min_doc_count": 1,
"shard_min_doc_count": 0,
"show_term_doc_count_error": false,
"order": [{
"top_hit": "asc"
}, {
"_key": "asc"
}]
},
"aggregations": {
"docs": {
"top_hits": {
"from": 0,
"size": 2,
"version": false,
"explain": false,
"sort": [{
"nextFetchDate": {
"order": "asc"
}
}]
}
},
"top_hit": {
"min": {
"field": "nextFetchDate"
}
}
}
}
}
}
}
}

i.e. 2500 buckets containing up to 2 docs. The performance is constant regardless of the number of buckets or docs within buckets, which makes sense given that most of the time is probably spent iterating over all the matching documents anyway.

When I compare the performance of such an aggregation query with a similar query using FieldCollapsing, the latter is a lot slower and needs more memory. It is even slower than using the aggregation without sampling.

Reading the PR and documentation, I was under the impression that Field Collapsing would be faster and cheaper than aggregations but empirical evidence shows otherwise. Am I missing something? Or is it that this is not the right use case for Field Collapsing?

Thanks!


(Mark Harwood) #2

Not sure what business problem you're trying to solve there.
Won't the diversified sampler give you a somewhat random choice of 2 docs per hostname (given the example query's scoring logic)? Then for each of those pairs you are getting the earliest. What is this intended to tell you?


(Jimferenczi) #3

Can you also share the different queries you're benchmarking ? Just in terms of performance I'd expect the sampler and field collapsing queries to behave similarly. Can you share how you compared them and the response times you get for these queries ? The performance of field collapsing depends greatly on the cardinality of the field you collapse on. How many different host names do you have in your index ?
The other difference that matters here is exactness, the sampler and terms solution could miss hits depending on where documents are (if the number of shards is greater than 1) whereas the field collapsing will always be accurate if you use inner_hits (which is a separate phase that we run at the end of the search).


(Jnioche) #4

Hi @Mark_Harwood. The aggregation above is used for selecting URLs for a web crawler to fetch. We don't want too many URLs per bucket (hostname) as we crawl politely and need as good a diversity of URLs to optimise the use of bandwidth by fetching in parallel.
The diversified sampler greatly improves the performance but we lose the guarantee that the URLs are definitely the ones we need to fetch first, which is an acceptable tradeoff for large crawls.


(Mark Harwood) #5

Maybe terms partitioning is a better solution here?

The benefits are you can perform an exhaustive sweep of all hostnames using multiple "partitions" and for each partition sort the hosts by the last-crawled date.


(Jnioche) #6

Hi @jimczi, thanks for your reply.

The queries based on the field collapsing look like this

{
"from": 0,
"size": 2500,
"query": {
	"range": {
		"nextFetchDate": {
			"from": null,
			"to": "2018-10-13T06:34:36+01:00",
			"include_lower": true,
			"include_upper": true,
			"boost": 1.0
		}
	}
},
"explain": false,
"sort": [{
	"nextFetchDate": {
		"order": "asc"
	}
}],
"track_total_hits": false,
"collapse": {
	"field": "hostname",
	"inner_hits": {
		"name": "urls_per_bucket",
		"ignore_unmapped": false,
		"from": 0,
		"size": 2,
		"version": false,
		"explain": false,
		"track_scores": false,
		"sort": [{
			"nextFetchDate": {
				"order": "asc"
			}
		}]
	}
}

}

I compare them both in situ by measuring how long the queries take while running the crawler and externally using the console in Kibana.

For this crawl, each shard has around 68K unique values for the hostname field.

Here are the times I got using the console using a match all query to keep things simple => Benchmark ES queries

NOTE: I haven't quite finished measuring with the sampling on

As expected the measurements for the (non-sampled) aggregations are pretty constant which is why my strategy is to get loads of buckets with 2 or URLs in each. It takes some time but we don't need to query often.

With the field collapsing the times are proportional to the number of buckets retrieved but looking at the average time per doc, it is worth getting a large number of buckets as well.

In practice, we try to reuse nextFetchDates from one query to the other to benefit from any caching. I will rerun the queries in Kibana using a different nextFetchDate every time - but using some time in the distant future to get the same number of documents but without any caching that query": { "match_all": {}} could have caused.

The values observed in situ were imperfect (more docs were being added to the index between the different runs) but gave some idea of which strategy to adopt:

AGGREG + sampling => AVERAGE QUERY TIME 2214 msec @ 18.57M docs
AGGREG - sampling => AVERAGE QUERY TIME 3170 msec @ 17.6 M docs
COLLAPSING => AVERAGE QUERY TIME 6290 msec @ 19.6M docs

The field collapsing triggered loads of CircuitBreakingException until I raised the Xmx.

What I should add to my logs is the average time per document retrieved - we don't necessarily get the exact number of documents we want: some buckets can have only 1 doc for instance.

I am wondering whether there are any scenarii were the Field Collapsing has its benefits e.g. when the number of hosts is limited, which would be the case for a vertical crawl.

I will have a look at Mark's suggestion. Thanks!


(Jimferenczi) #7

I am not surprised by these results. The collapse part should be fast, what slows down things is the inner_hits retrieval on 2500 top hits. From the docs:

The expansion of the group is done by sending an additional query for each inner_hit request for each collapsed hit returned in the response. This can significantly slow things down if you have too many groups and/or inner_hit requests.

The performance without inner_hits should be much better no matter what the size is.
If you really need this amount of top docs ("size":2500) and inner hits ("size":2) you should use aggregations.

The field collapsing triggered loads of CircuitBreakingException until I raised the Xmx.

Do you have logs for these failures ? Can you share them ?


(Jnioche) #8

The performance without inner_hits should be much better no matter what the size is.

That's what I was expecting but without using inner hits it took 6.9 secs to retrieve 2500 docs and 8 secs to get 3 times as many

buckets	| urls per bucket |	total	time |	average per doc
2500|1|2500|6930|2.77|
2500|2|5000|8097|1.62|
2500|3|7500|8024|1.07|

unless I got my test wrong, the overhead of the inner hits wasn't that great and so it was worth getting more URLs per bucket.

Do you have logs for these failures ? Can you share them ?

Haven't kept the logs only the following snippet in my notes

"reason": "java.util.concurrent.ExecutionException: CircuitBreakingException[[fielddata] Data too large, data for [_id] would be [1600542593/1.4gb], which is larger than the limit of [622775500/593.9mb]]"


(Jimferenczi) #9

unless I got my test wrong, the overhead of the inner hits wasn't that great and so it was worth getting more URLs per bucket.

So the first result is the for the query without inner_hits ? :

"collapse": {
	"field": "hostname"
}

That's not what I'd expect I agree. Do you have consistent results in your benchmark ?

reason": "java.util.concurrent.ExecutionException: CircuitBreakingException[[fielddata] Data too large, data for [_id] would be [1600542593/1.4gb], which is larger than the limit of [622775500/593.9mb]]"

That should not be related to field collapsing. This circuit breaker exception is about loading the field data for the _id field so it is generated by another request.


(Jnioche) #10

@jimczi

Here is what I am getting without inner_hits

   2018-11-13 16:24:50.034 c.d.s.e.p.CollapsingSpout I/O dispatcher 18 [INFO] [spout #4]  ES query returned 2500 hits from 2500 buckets in 6477 msec with 0 already being processed.Took 2.5908 msec per doc on average.
    2018-11-13 16:24:53.935 c.d.s.e.p.CollapsingSpout I/O dispatcher 14 [INFO] [spout #0]  ES query returned 2500 hits from 2500 buckets in 10283 msec with 0 already being processed.Took 4.1132 msec per doc on average.
    2018-11-13 16:24:56.609 c.d.s.e.p.CollapsingSpout I/O dispatcher 20 [INFO] [spout #6]  ES query returned 2500 hits from 2500 buckets in 13018 msec with 0 already being processed.Took 5.2072 msec per doc on average.
    2018-11-13 16:24:58.496 c.d.s.e.p.CollapsingSpout I/O dispatcher 16 [INFO] [spout #8]  ES query returned 2500 hits from 2500 buckets in 14981 msec with 0 already being processed.Took 5.9924 msec per doc on average.
    2018-11-13 16:25:00.677 c.d.s.e.p.CollapsingSpout I/O dispatcher 12 [INFO] [spout #2]  ES query returned 2500 hits from 2500 buckets in 17157 msec with 0 already being processed.Took 6.8628 msec per doc on average.
    2018-11-13 16:24:55.668 c.d.s.e.p.CollapsingSpout I/O dispatcher 1 [INFO] [spout #9]  ES query returned 2500 hits from 2500 buckets in 11760 msec with 0 already being processed.Took 4.704 msec per doc on average.
    2018-11-13 16:24:55.927 c.d.s.e.p.CollapsingSpout I/O dispatcher 10 [INFO] [spout #7]  ES query returned 2500 hits from 2500 buckets in 12053 msec with 0 already being processed.Took 4.8212 msec per doc on average.
    2018-11-13 16:24:56.031 c.d.s.e.p.CollapsingSpout I/O dispatcher 35 [INFO] [spout #1]  ES query returned 2500 hits from 2500 buckets in 12476 msec with 0 already being processed.Took 4.9904 msec per doc on average.
    2018-11-13 16:24:57.253 c.d.s.e.p.CollapsingSpout I/O dispatcher 32 [INFO] [spout #3]  ES query returned 2500 hits from 2500 buckets in 14111 msec with 0 already being processed.Took 5.6444 msec per doc on average.
    2018-11-13 16:25:04.657 c.d.s.e.p.CollapsingSpout I/O dispatcher 28 [INFO] [spout #5]  ES query returned 2500 hits from 2500 buckets in 21516 msec with 0 already being processed.Took 8.6064 msec per doc on average.

and with 2 inner hits

2018-11-13 16:19:52.532 c.d.s.e.p.CollapsingSpout I/O dispatcher 32 [INFO] [spout #4]  ES query returned 4476 hits from 2500 buckets in 11337 msec with 0 already being processed.Took 2.532842 msec per doc on average.
2018-11-13 16:20:00.375 c.d.s.e.p.CollapsingSpout I/O dispatcher 24 [INFO] [spout #0]  ES query returned 4833 hits from 2500 buckets in 19077 msec with 0 already being processed.Took 3.9472377 msec per doc on average.
2018-11-13 16:21:09.415 c.d.s.e.p.CollapsingSpout I/O dispatcher 36 [INFO] [spout #6]  ES query returned 4646 hits from 2500 buckets in 27234 msec with 0 already being processed.Took 5.8618164 msec per doc on average.
2018-11-13 16:21:12.204 c.d.s.e.p.CollapsingSpout I/O dispatcher 10 [INFO] [spout #8]  ES query returned 4826 hits from 2500 buckets in 30004 msec with 0 already being processed.Took 6.217157 msec per doc on average.
2018-11-13 16:21:23.943 c.d.s.e.p.CollapsingSpout I/O dispatcher 10 [INFO] [spout #2]  ES query returned 4423 hits from 2500 buckets in 11680 msec with 0 already being processed.Took 2.6407416 msec per doc on average.
2018-11-13 16:20:01.654 c.d.s.e.p.CollapsingSpout I/O dispatcher 11 [INFO] [spout #9]  ES query returned 4629 hits from 2500 buckets in 20737 msec with 0 already being processed.Took 4.479801 msec per doc on average.
2018-11-13 16:21:07.298 c.d.s.e.p.CollapsingSpout I/O dispatcher 23 [INFO] [spout #7]  ES query returned 4780 hits from 2500 buckets in 24732 msec with 0 already being processed.Took 5.1740584 msec per doc on average.
2018-11-13 16:21:09.820 c.d.s.e.p.CollapsingSpout I/O dispatcher 11 [INFO] [spout #1]  ES query returned 4662 hits from 2500 buckets in 27324 msec with 0 already being processed.Took 5.861004 msec per doc on average.
2018-11-13 16:21:12.359 c.d.s.e.p.CollapsingSpout I/O dispatcher 25 [INFO] [spout #3]  ES query returned 4781 hits from 2500 buckets in 28875 msec with 0 already being processed.Took 6.0395317 msec per doc on average.
2018-11-13 16:21:26.479 c.d.s.e.p.CollapsingSpout I/O dispatcher 25 [INFO] [spout #5]  ES query returned 4497 hits from 2500 buckets in 13984 msec with 0 already being processed.Took 3.1096287 msec per doc on average.

Here is what the request looks like

ES query SearchRequest{searchType=QUERY_THEN_FETCH, indices=[status], indicesOptions=IndicesOptions[ignore_unavailable=false, allow_no_indices=true, expand_wildcards_open=true, expand_wildcards_closed=false, allow_aliases_to_multiple_indices=true, forbid_closed_indices=true, ignore_aliases=false], types=[status], routing='null', preference='_shards:2', requestCache=null, scroll=null, maxConcurrentShardRequests=0, batchedReduceSize=512, preFilterShardSize=128, allowPartialSearchResults=null, source={"from":0,"size":2500,"query":{"range":{"nextFetchDate":{"from":null,"to":"2018-11-13T16:24:42Z","include_lower":true,"include_upper":true,"boost":1.0}}},"explain":false,"sort":[{"nextFetchDate":{"order":"asc"}}],"track_total_hits":false,"collapse":{"field":"hostname"}}}

for the first set of results.

In absolute terms, not using inner_hits is faster but when we look at the average time spent per doc this is not the case.

Re-circuit breaker: not sure what could have caused it

Thanks!


(Jnioche) #11

by comparison here are the logs with the sampled aggregations for 2500 buckets containing 2 docs

2018-11-13 16:40:01.782 c.d.s.e.p.AggregationSpout I/O dispatcher 31 [INFO] [spout #4]  ES query returned 4750 hits from 2500 buckets in 9481 msec with 0 already being processed. Took 1.996 msec per doc on average.
2018-11-13 16:40:03.698 c.d.s.e.p.AggregationSpout I/O dispatcher 29 [INFO] [spout #0]  ES query returned 4808 hits from 2500 buckets in 11327 msec with 0 already being processed. Took 2.3558652 msec per doc on average.
2018-11-13 16:40:04.118 c.d.s.e.p.AggregationSpout I/O dispatcher 14 [INFO] [spout #8]  ES query returned 4838 hits from 2500 buckets in 11713 msec with 0 already being processed. Took 2.4210417 msec per doc on average.
2018-11-13 16:40:04.244 c.d.s.e.p.AggregationSpout I/O dispatcher 10 [INFO] [spout #6]  ES query returned 4696 hits from 2500 buckets in 11811 msec with 0 already being processed. Took 2.5151193 msec per doc on average.
2018-11-13 16:40:04.804 c.d.s.e.p.AggregationSpout I/O dispatcher 26 [INFO] [spout #2]  ES query returned 4711 hits from 2500 buckets in 12527 msec with 0 already being processed. Took 2.6590958 msec per doc on average.
2018-11-13 16:40:02.338 c.d.s.e.p.AggregationSpout I/O dispatcher 18 [INFO] [spout #3]  ES query returned 4735 hits from 2500 buckets in 9731 msec with 0 already being processed. Took 2.0551214 msec per doc on average.
2018-11-13 16:40:03.884 c.d.s.e.p.AggregationSpout I/O dispatcher 9 [INFO] [spout #9]  ES query returned 4808 hits from 2500 buckets in 10638 msec with 0 already being processed. Took 2.2125623 msec per doc on average.
2018-11-13 16:40:04.057 c.d.s.e.p.AggregationSpout I/O dispatcher 20 [INFO] [spout #7]  ES query returned 4774 hits from 2500 buckets in 11510 msec with 0 already being processed. Took 2.4109762 msec per doc on average.
2018-11-13 16:40:04.285 c.d.s.e.p.AggregationSpout I/O dispatcher 22 [INFO] [spout #1]  ES query returned 4736 hits from 2500 buckets in 11875 msec with 0 already being processed. Took 2.5073903 msec per doc on average.
2018-11-13 16:40:04.386 c.d.s.e.p.AggregationSpout I/O dispatcher 15 [INFO] [spout #5]  ES query returned 4744 hits from 2500 buckets in 12022 msec with 0 already being processed. Took 2.5341485 msec per doc on average

The great thing about it is that I can set a larger number of buckets without affecting the timings too much

2018-11-13 16:43:11.802 c.d.s.e.p.AggregationSpout I/O dispatcher 34 [INFO] [spout #4]  ES query returned 7476 hits from 5000 buckets in 7627 msec with 0 already being processed. Took 1.020198 msec per doc on average.
2018-11-13 16:43:12.433 c.d.s.e.p.AggregationSpout I/O dispatcher 33 [INFO] [spout #2]  ES query returned 8295 hits from 5000 buckets in 7840 msec with 0 already being processed. Took 0.9451477 msec per doc on average.
2018-11-13 16:43:14.160 c.d.s.e.p.AggregationSpout I/O dispatcher 32 [INFO] [spout #0]  ES query returned 8039 hits from 5000 buckets in 8999 msec with 0 already being processed. Took 1.1194178 msec per doc on average.
2018-11-13 16:43:14.237 c.d.s.e.p.AggregationSpout I/O dispatcher 14 [INFO] [spout #8]  ES query returned 8290 hits from 5000 buckets in 9108 msec with 0 already being processed. Took 1.0986731 msec per doc on average.
2018-11-13 16:43:14.524 c.d.s.e.p.AggregationSpout I/O dispatcher 13 [INFO] [spout #6]  ES query returned 7934 hits from 5000 buckets in 9646 msec with 0 already being processed. Took 1.2157801 msec per doc on average.
2018-11-13 16:43:15.022 c.d.s.e.p.AggregationSpout I/O dispatcher 31 [INFO] [spout #7]  ES query returned 7929 hits from 5000 buckets in 8902 msec with 0 already being processed. Took 1.122714 msec per doc on average.
2018-11-13 16:43:15.167 c.d.s.e.p.AggregationSpout I/O dispatcher 33 [INFO] [spout #1]  ES query returned 8264 hits from 5000 buckets in 9196 msec with 0 already being processed. Took 1.1127783 msec per doc on average.
2018-11-13 16:43:15.395 c.d.s.e.p.AggregationSpout I/O dispatcher 1 [INFO] [spout #9]  ES query returned 7575 hits from 5000 buckets in 9448 msec with 0 already being processed. Took 1.2472607 msec per doc on average.
2018-11-13 16:43:15.414 c.d.s.e.p.AggregationSpout I/O dispatcher 20 [INFO] [spout #5]  ES query returned 8069 hits from 5000 buckets in 10750 msec with 0 already being processed. Took 1.3322593 msec per doc on average.
2018-11-13 16:43:15.756 c.d.s.e.p.AggregationSpout I/O dispatcher 19 [INFO] [spout #3]  ES query returned 8330 hits from 5000 buckets in 11278 msec with 0 already being processed. Took 1.3539015 msec per doc on average.

(Jnioche) #12

Thanks @Mark_Harwood, I wasn't aware of this. Out of curiosity, is there a description somewhere of how the partitions are generated?


(Mark Harwood) #13

It's the hash of the term, modulo number of chosen partitions. Basically the same strategy as distributing docs evenly to shards.


(Jimferenczi) #14

In absolute terms, not using inner_hits is faster but when we look at the average time spent per doc this is not the case.

That is weird. What is the average size of your documents ? I wonder if retrieving the source of the documents is the costly part and not the collapsing. Can you try with a smaller size (like 10) and compare with the terms aggregation ?


(Jimferenczi) #15

I did some tests and the collapsing is constantly faster than terms with and without sampling. I tried 20M docs in a single shard with 68k hostnames and a random date for nextFetchDate. One thing that could explain your results is the repartition of the nextFetchDate among the documents. I filled this field randomly but if the smallest nextFecthDate are on new documents then the collapse query is slower. This is due to the fact that we extract the hostname only on documents that are competitive (that can enters the top hits), if the repartition is random we have more chances to skip the extraction (which is costly since hostname are long strings). Can you try to inverse the sort order:

"sort":[{"nextFetchDate":{"order":"desc"}}

Is it possible that the repartition of nextFecthDate value is biased towards new documents (new documents tend to have a smaller nextFecthDate) ?


(Jnioche) #16

227.4m docs takes 44.7 GB, so on average 211 bytes per doc?

The settings for this crawl is really minimal, we only keep what is necessary. A typical document looks like this

{
  "_index": "status",
  "_type": "status",
  "_id": "59d6e813ee44da9702a2c20dbe22a02a5298a55a35bfb1961f3ef3c5173e7fd3",
  "_version": 2,
  "_score": 1,
  "_routing": "olx.ro",
  "_source": {
    "url": "https://www.olx.ro/auto-masini-moto-ambarcatiuni/autoturisme/",
    "status": "FETCHED",
    "metadata": {
      "depth": [
        "3"
      ]
    },
    "hostname": "olx.ro",
    "nextFetchDate": "2099-12-31T00:00:00.000Z"
  },
  "fields": {
    "nextFetchDate": [
      "2099-12-31T00:00:00.000Z"
    ]
  }
}

when displaying the JSON in Kibana.

With 10 buckets of max 2 urls, sampling aggregs

6701/worker.log:2018-11-13 21:26:24.433 c.d.s.e.p.AggregationSpout I/O dispatcher 6 [INFO] [spout #7]  ES query returned 18 hits from 10 buckets in 112 msec with 2 already being processed. Took 6.2222223 msec per doc on average.
6701/worker.log:2018-11-13 21:26:26.325 c.d.s.e.p.AggregationSpout I/O dispatcher 3 [INFO] [spout #7]  ES query returned 18 hits from 10 buckets in 2 msec with 18 already being processed. Took 0.11111111 msec per doc on average.
6701/worker.log:2018-11-13 21:26:28.337 c.d.s.e.p.AggregationSpout I/O dispatcher 3 [INFO] [spout #7]  ES query returned 18 hits from 10 buckets in 4 msec with 18 already being processed. Took 0.22222222 msec per doc on average.
6701/worker.log:2018-11-13 21:26:30.338 c.d.s.e.p.AggregationSpout I/O dispatcher 7 [INFO] [spout #7]  ES query returned 18 hits from 10 buckets in 4 msec with 18 already being processed. Took 0.22222222 msec per doc on average.
6701/worker.log:2018-11-13 21:26:32.345 c.d.s.e.p.AggregationSpout I/O dispatcher 3 [INFO] [spout #7]  ES query returned 18 hits from 10 buckets in 2 msec with 18 already being processed. Took 0.11111111 msec per doc on average

We're reusing the date for the queries after the first one and they run so fast that the content of the index hasn't had time to change. The URLs returned are still being processed.

Now with the field collapsing

6701/worker.log:2018-11-13 21:31:10.140 c.d.s.e.p.CollapsingSpout I/O dispatcher 24 [INFO] [spout #7]  ES query returned 20 hits from 10 buckets in 5388 msec with 0 already being processed.Took 269.4 msec per doc on average.
6701/worker.log:2018-11-13 21:31:13.894 c.d.s.e.p.CollapsingSpout I/O dispatcher 24 [INFO] [spout #7]  ES query returned 20 hits from 10 buckets in 3730 msec with 0 already being processed.Took 186.5 msec per doc on average.
6701/worker.log:2018-11-13 21:31:16.402 c.d.s.e.p.CollapsingSpout I/O dispatcher 23 [INFO] [spout #7]  ES query returned 20 hits from 10 buckets in 2464 msec with 0 already being processed.Took 123.2 msec per doc on average.
6701/worker.log:2018-11-13 21:31:18.773 c.d.s.e.p.CollapsingSpout I/O dispatcher 23 [INFO] [spout #7]  ES query returned 20 hits from 10 buckets in 2370 msec with 0 already being processed.Took 118.5 msec per doc on average.
6701/worker.log:2018-11-13 21:31:22.855 c.d.s.e.p.CollapsingSpout I/O dispatcher 23 [INFO] [spout #7]  ES query returned 20 hits from 10 buckets in 4079 msec with 0 already being processed.Took 203.95 msec per doc on average.
6701/worker.log:2018-11-13 21:31:26.258 c.d.s.e.p.CollapsingSpout I/O dispatcher 23 [INFO] [spout #7]  ES query returned 20 hits from 10 buckets in 3393 msec with 0 already being processed.Took 169.65 msec per doc on average.
6701/worker.log:2018-11-13 21:31:29.148 c.d.s.e.p.CollapsingSpout I/O dispatcher 23 [INFO] [spout #7]  ES query returned 20 hits from 10 buckets in 2836 msec with 0 already being processed.Took 141.8 msec per doc on average.

(Jnioche) #17

new documents tend to have a smaller nextFecthDate

Not sure what you mean by that. The size of the nextFetchDate should be the same in all cases.

Most URLs in the crawl are not yet fetched, their nextFetchDate is set to the instant they were discovered. Once they have been fetched, they get a nextFetchDate set in a more or less distant future.
nextFetchDate actually means: don't try to fetch that URL until that date.

They are declared like so

"nextFetchDate": {
					"type": "date",
					"format": "dateOptionalTime"
				},

see https://github.com/DigitalPebble/storm-crawler/blob/master/external/elasticsearch/ES_IndexInit.sh#L38

Changing the order for the sort does not seem to make much of a difference, here Collapsing with 2.5K buckets and 2 urls per bucket

6700/worker.log:2018-11-13 21:44:07.136 c.d.s.e.p.CollapsingSpout I/O dispatcher 26 [INFO] [spout #4]  ES query returned 4319 hits from 2500 buckets in 8622 msec with 0 already being processed.Took 1.9962955 msec per doc on average.
6700/worker.log:2018-11-13 21:44:11.075 c.d.s.e.p.CollapsingSpout I/O dispatcher 29 [INFO] [spout #0]  ES query returned 4155 hits from 2500 buckets in 12203 msec with 0 already being processed.Took 2.9369435 msec per doc on average.
6700/worker.log:2018-11-13 21:44:21.463 c.d.s.e.p.CollapsingSpout I/O dispatcher 16 [INFO] [spout #8]  ES query returned 4345 hits from 2500 buckets in 22992 msec with 0 already being processed.Took 5.2915998 msec per doc on average.
6700/worker.log:2018-11-13 21:44:24.373 c.d.s.e.p.CollapsingSpout I/O dispatcher 11 [INFO] [spout #6]  ES query returned 4274 hits from 2500 buckets in 25932 msec with 0 already being processed.Took 6.0673842 msec per doc on average.
6700/worker.log:2018-11-13 21:44:28.060 c.d.s.e.p.CollapsingSpout I/O dispatcher 22 [INFO] [spout #2]  ES query returned 4295 hits from 2500 buckets in 29584 msec with 0 already being processed.Took 6.8880095 msec per doc on average.
6701/worker.log:2018-11-13 21:44:12.775 c.d.s.e.p.CollapsingSpout I/O dispatcher 36 [INFO] [spout #9]  ES query returned 4326 hits from 2500 buckets in 14065 msec with 0 already being processed.Took 3.2512715 msec per doc on average.
6701/worker.log:2018-11-13 21:44:13.080 c.d.s.e.p.CollapsingSpout I/O dispatcher 35 [INFO] [spout #7]  ES query returned 4107 hits from 2500 buckets in 14936 msec with 0 already being processed.Took 3.6367178 msec per doc on average.
6701/worker.log:2018-11-13 21:44:17.684 c.d.s.e.p.CollapsingSpout I/O dispatcher 19 [INFO] [spout #1]  ES query returned 4099 hits from 2500 buckets in 19400 msec with 0 already being processed.Took 4.7328615 msec per doc on average.
6701/worker.log:2018-11-13 21:44:17.898 c.d.s.e.p.CollapsingSpout I/O dispatcher 26 [INFO] [spout #3]  ES query returned 4172 hits from 2500 buckets in 19743 msec with 0 already being processed.Took 4.7322626 msec per doc on average.
6701/worker.log:2018-11-13 21:44:44.162 c.d.s.e.p.CollapsingSpout I/O dispatcher 26 [INFO] [spout #5]  ES query returned 4251 hits from 2500 buckets in 15983 msec with 0 already being processed.Took 3.7598212 msec per doc on average.

BTW this is where the code of the spouts that generate the queries lives:
https://github.com/DigitalPebble/storm-crawler/tree/master/external/elasticsearch/src/main/java/com/digitalpebble/stormcrawler/elasticsearch/persistence

Thanks a lot, I really appreciate you taking the time to help me understand it better.


(Jimferenczi) #18

6701/worker.log:2018-11-13 21:31:10.140 c.d.s.e.p.CollapsingSpout I/O dispatcher 24 [INFO] [spout #7] ES query returned 20 hits from 10 buckets in 5388 msec with 0 already being processed.Took 269.4 msec per doc on average.

That's really slow and I cannot explain why :(. Can you test with:
"sort":["_doc"] ?
Which version of ES are you testing ?
Since I am not able to reproduce with random data would it be possible to get a copy of one shard ?


(Jnioche) #19
6700/worker.log:2018-11-14 08:19:09.794 c.d.s.e.p.CollapsingSpout I/O dispatcher 2 [INFO] [spout #4]  ES query returned 2500 hits from 2500 buckets in 8217 msec with 0 already being processed.Took 3.2868 msec per doc on average.
6700/worker.log:2018-11-14 08:19:13.447 c.d.s.e.p.CollapsingSpout I/O dispatcher 3 [INFO] [spout #0]  ES query returned 2500 hits from 2500 buckets in 12048 msec with 0 already being processed.Took 4.8192 msec per doc on average.
6700/worker.log:2018-11-14 08:19:20.323 c.d.s.e.p.CollapsingSpout I/O dispatcher 4 [INFO] [spout #6]  ES query returned 2500 hits from 2500 buckets in 18804 msec with 0 already being processed.Took 7.5216 msec per doc on average.
6700/worker.log:2018-11-14 08:19:24.171 c.d.s.e.p.CollapsingSpout I/O dispatcher 5 [INFO] [spout #8]  ES query returned 2500 hits from 2500 buckets in 22874 msec with 0 already being processed.Took 9.1496 msec per doc on average.
6700/worker.log:2018-11-14 08:19:25.731 c.d.s.e.p.CollapsingSpout I/O dispatcher 1 [INFO] [spout #2]  ES query returned 2500 hits from 2500 buckets in 24403 msec with 0 already being processed.Took 9.7612 msec per doc on average.
6701/worker.log:2018-11-14 08:19:16.100 c.d.s.e.p.CollapsingSpout I/O dispatcher 9 [INFO] [spout #9]  ES query returned 2500 hits from 2500 buckets in 14229 msec with 0 already being processed.Took 5.6916 msec per doc on average.
6701/worker.log:2018-11-14 08:19:19.095 c.d.s.e.p.CollapsingSpout I/O dispatcher 14 [INFO] [spout #7]  ES query returned 2500 hits from 2500 buckets in 17504 msec with 0 already being processed.Took 7.0016 msec per doc on average.
6701/worker.log:2018-11-14 08:19:20.875 c.d.s.e.p.CollapsingSpout I/O dispatcher 6 [INFO] [spout #1]  ES query returned 2500 hits from 2500 buckets in 19563 msec with 0 already being processed.Took 7.8252 msec per doc on average.
6701/worker.log:2018-11-14 08:19:21.661 c.d.s.e.p.CollapsingSpout I/O dispatcher 1 [INFO] [spout #3]  ES query returned 2500 hits from 2500 buckets in 20310 msec with 0 already being processed.Took 8.124 msec per doc on average.
6701/worker.log:2018-11-14 08:19:24.030 c.d.s.e.p.CollapsingSpout I/O dispatcher 12 [INFO] [spout #5]  ES query returned 2500 hits from 2500 buckets in 22792 msec with 0 already being processed.Took 9.1168 msec per doc on average.

Here is what the queries look like

2018-11-14 08:24:09.797 c.d.s.e.p.CollapsingSpout Thread-31-spout-executor[26 26] [DEBUG] [spout #7]  ES query SearchRequest{searchType=QUERY_THEN_FETCH, indices=[status], indicesOptions=IndicesOptions[ignore_unavailable=false, allow_no_indices=true, expand_wildcards_open=true, expand_wildcards_closed=false, allow_aliases_to_multiple_indices=true, forbid_closed_indices=true, ignore_aliases=false], types=[status], routing='null', preference='_shards:7', requestCache=null, scroll=null, maxConcurrentShardRequests=0, batchedReduceSize=512, preFilterShardSize=128, allowPartialSearchResults=null, source={"from":0,"size":2500,"query":{"range":{"nextFetchDate":{"from":null,"to":"2018-11-14T08:19:00Z","include_lower":true,"include_upper":true,"boost":1.0}}},"explain":false,"sort":[{"_doc":{"order":"asc"}}],"track_total_hits":false,"collapse":{"field":"hostname"}}}
2018-11-14 08:24:17.536 c.d.s.e.p.CollapsingSpout I/O dispatcher 12 [INFO] [spout #7]  ES query returned 2500 hits from 2500 buckets in 7724 msec with 525 already being processed.Took 3.0896 msec per doc on average.

Am on ES 6.4.3

Shard: sure! Is there a way I can find the physical location of a shard?

_curl -X GET "localhost:9200/cat/shards/status"

does not give me any info I can relate to what I see in /var/lib/elasticsearch/nodes/0/indices

My smallest shard is 2.2GB large, where can I store it for you to access?


(Jimferenczi) #20

Shard: sure! Is there a way I can find the physical location of a shard?

It would be better to create a snapshot. You can reindex the content of a single shard in a new index with the following command:

POST _reindex
{
  "source": {
    "index": "source_index",
    "slice": {
      "id": 2, # the shard id to reindex
      "max": 5 # the total number of shards in the index
    }
  },
  "dest": {
    "index": "dest_index"
  }
}

Then you can create a snapshot of the dest_index in a fs repository:
https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-snapshots.html

My smallest shard is 2.2GB large, where can I store it for you to access?

I can set up an S3 bucket if you'd like or you can send it privately to me via gmail (it will be uploaded automatically to google drive).