ES not returning results when doing multi-thread msearch

A little background. Below is probably not what ElasticSearch is designed for but this is the most efficient way I can think of for achieving what I want to achieve.
Setup and Goal: I have an index (ES 1.7), 800k documents, and <500M in size (one doc is pretty small). I have > 1 million queries that I need to run against this index and only returning the top 25 documents each time. I want to be able to achieve this below 1 hour.
Current Failed Attempt (I have tried many other ways but all failed, all end up with this same problem):

  1. I am using Python requests package;
  2. The queries are partitioned into 60 partitions, each partition around 20000 queries;
  3. I created 15 identical indexes (2 shards, 0 replica);
  4. I loop through the partitions doing the following:
    4a. I am using the msearch end point;
    4b. I am using multiprocessing with 15 processes (one process using one of the 15 identical indexes);
    4c. Each process, I do 10 queries at a time through msearch.
  5. One partition (at least for the first 3) actually only takes around 20 seconds to finish.
  6. When I loop to the 4th partition, problem arises. I am getting empty responses (response.text is empty).
  7. I waited for a little to run the 4th partition, it finished successfully.
  8. The 5th partition failed again.
  9. I waited for a longer time. I can run 5th, 6th and 7th but the 8th failed.

My suspicion is that: there might be some caching going on. When I run many queries at the short time period, the caching place gets filled; when I stop querying, the cache got released more and more as quiet time gets longer and longer.

I am no engineer just a user so I don't really know what is going on under the hood. Can someone point to me what the problem is? Is there anything I can do to stop that "caching"?

Thanks in advance,
Wei

Why are you using such an old version?

Given the size a single primary shard might be better. As you do not seem to be writing to it, you may also benefit from force merging it down to a single segment.

Why have you done this?

What is the specification of your cluster? Do you have monitoring installed so you can see what is limiting performance?

I don't have control over this.

I once did it on a single index and the operations made the index to go red.

Below are the stats from running _cluster/stats:
{
timestamp: 1531499575241,
cluster_name: "dev-esapp2-17",
status: "green",
indices: {
count: 56,
shards: {
total: 363,
primaries: 193,
replication: 0.8808290155440415,
index: {
shards: {
min: 1,
max: 15,
avg: 6.482142857142857,
},
primaries: {
min: 1,
max: 5,
avg: 3.4464285714285716,
},
replication: {
min: 0,
max: 2,
avg: 0.75,
},
},
},
docs: {
count: 87904331,
deleted: 16087362,
},
store: {
size_in_bytes: 444425411554,
throttle_time_in_millis: 1773336,
},
fielddata: {
memory_size_in_bytes: 228118236,
evictions: 0,
},
filter_cache: {
memory_size_in_bytes: 2601705780,
evictions: 0,
},
id_cache: {
memory_size_in_bytes: 0
},
completion: {
size_in_bytes: 0
},
segments: {
count: 6315,
memory_in_bytes: 2415717542,
index_writer_memory_in_bytes: 0,
index_writer_max_memory_in_bytes: 6912139264,
version_map_memory_in_bytes: 0,
fixed_bit_set_memory_in_bytes: 23999296,
},
percolate: {
total: 0,
time_in_millis: 0,
current: 0,
memory_size_in_bytes: -1,
memory_size: "-1b",
queries: 0,
},
},
nodes: {
count: {
total: 15,
master_only: 3,
data_only: 9,
master_data: 0,
client: 0,
},
versions: [
"1.7.5"
],
os: {
available_processors: 108,
mem: {
total_in_bytes: 731218132992
},
cpu: [
{
vendor: "Intel",
model: "Xeon",
mhz: 2300,
total_cores: 8,
total_sockets: 1,
cores_per_socket: 8,
cache_size_in_bytes: 46080,
count: 15,
}
],
},
process: {
cpu: {
percent: 143
},
open_file_descriptors: {
min: 547,
max: 1798,
avg: 1289,
},
},
jvm: {
max_uptime_in_millis: 2056273349,
versions: [
{
version: "1.8.0_151",
vm_name: "OpenJDK 64-Bit Server VM",
vm_version: "25.151-b12",
vm_vendor: "Oracle Corporation",
count: 8,
},
{
version: "1.8.0_102",
vm_name: "OpenJDK 64-Bit Server VM",
vm_version: "25.102-b14",
vm_vendor: "Oracle Corporation",
count: 7,
},
],
mem: {
heap_used_in_bytes: 160690785248,
heap_max_in_bytes: 350172217344,
},
threads: 1426,
},
fs: {
total_in_bytes: 4779726888960,
free_in_bytes: 3695000637440,
available_in_bytes: 3451921522688,
},
plugins: [
{
name: "head",
version: "NA",
description: "No description found.",
url: "/_plugin/head/",
jvm: false,
site: true,
},
{
name: "cloud-aws",
version: "2.7.1",
description: "Cloud AWS Plugin",
jvm: true,
site: false,
},
],
},
}

What kind of monitoring are you referring to? Which tool should I use? Which fields should I pay attention to?

Thank you so much,
Wei

I do not see how having 15 identical indices will help performance. If you just had a single one it would most certainly be cached. What did you do that made it go red?

I would recommend keeping just one of the identical indices (possibly with a single primary shard) and increase the number of replicas so all data nodes in the cluster have a copy, e.g. 8 if you have 9 data nodes. Before you increase the number of replicas, run optimize on it with max_num_segments set to 1.

Do you have any non-default settings in your config?

I did not know what exactly caused the index to go red since the cluster at the time was not stable as I later found out. But what I did was using msearch with 20000 queries at the same time and I did this 3 times. Not sure if this is the cause.

I've tried it with more replicas (6) but with 2 primary shards (we have 15 nodes). Let me try with 1 primary shards and 13 replica?

For the indexes, other than max_result_winodw is set to 3000000, everything is default.

The stats you provided indicate that you have 9 data nodes, which is why I suggested 8 replicas.

That seems excessive. It is better to send a larger number of smaller batches (100?) rather than a few large ones as it uses up less resources. Make sure you distribute requests across all data nodes in the cluster.

How do I achieve this? Does ES automatically do that or I have to manually direct requests to specific nodes?

Thanks.

How are you querying the data? Using one of the language clients?

I am using Python requests package
uri = 'http://xxx:9200/index1/_msearch'
response = requests.get(uri, data=body)

Then you can provide the address of multiple nodes to it and it will distribute requests across these.

I only know one url that points to these multiple nodes. This one url is sufficient, right? It will just distribute requests across them automatically?

Sorry if the question seems dumb.

Thanks,
Wei

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.