Script Aggregations gather phase on client node

Hi Community!

We have a test cluster which consists of 4 data nodes (12 CPU cores, 30GB heap each) and a client node (12 CPU cores and 30GB heap). Elasticsearch version 2.1.1.

There are 19 big indexes (~2.500.000.000 docs each), 32 shards each index (16 primary + 1 replica)

We are continuously indexing ~15.000 docs/second.

The documents have 8 fields, we are using routing over a unique doc identifier (custom_doc_id). We tried to enable routing, but it seems to decrease speed.

We are running the following scripted aggregate query against the client node using the nodeJS client libraries:


esclient.search({
index: indexHistoric,
search_type: 'count',
//routing: true,
body: {
from: 0,
//sort: ["_doc"],
query: {
"filtered" : {
"filter" : {
"bool" : { "must" : [fiels_1, field_2, field_3],
"should" : dates }
}
}
},
"aggs": {
"agg_1": {
"scripted_metric": {
"init_script": "_agg['custom_doc_id'] = [:]",
"map_script": "if(!_agg['custom_doc_id'].hasProperty(doc['custom_doc_id'].value)) { _agg['custom_doc_id'][doc['custom_doc_id'].value] = doc['_index'].value }",
"reduce_script": "occurrence = [:]; output=[]; for(m in _aggs['custom_doc_id']) { for(j in m) { if (!occurrence.containsKey(j.getKey())) { occurrence[j.getKey()] = []; occurrence[j.getKey()].add(j.getValue()); } else { if (!occurrence[j.getKey()].containsAll(j.getValue())) { occurrence[j.getKey()].add(j.getValue());} } } }; for(n in occurrence) { if (occurrence[n.getKey()].size()>=" + numOccurrence + ") { output.add(n.getKey()); }; }; _aggs.clear(); occurrence.clear();return output; "
}
}
}
}
}, process_es_history_reply.bind({
req: req
}));

We are searching over 10 indexes and it takes around 22 minutes. This is not bad considering the huge amount of data.

Anyway, we are wondering why we never see any load on the client node. The gather phase should be performed on the coordinating node (https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-node.html). We observe that all search threads are active on only one data (not the client) node at a time. Is there a way to distribute the threads over the whole cluster to increase performance?

Any advise on optimizing our query or setup is very welcome.

Thanks!