Getting org.elasticsearch.common.util.concurrent.EsRejectedExecutionException

I have done setup of elasticsearch-1.4.2 and import ~150million records from mysql to elasticsearch.
Everything was working fine and suddenly after 2 days I was getting below exception,

Exception is present at http://pastebin.com/1Zmhd6UZ location.

I have only one data node and everything is running with default Elastic Search configuration.
I have written one scheduler which runs every 6 hours and insert 0.5 million records to elasticsearch.

Please suggest me how to solve this issue.

This is the way that elasticsearch tells you that you are sending data too quickly compared to what it can ingest. When you get such an exception, you should slow down the ingestion rate a bit.

Changing ingestion rates means amount of data to insert into elasticsearch?

Yes. This message means that you are sending too much data in parallel compared to what elasticsearch can handle, so you need to reduce the number of threads/workers that send data to elasticsearch, at least temporarily.

Can you please help me in below understanding,
while importing ~150 million data from mysql to elasticsearch I didn't get this error whereas during scheduler run I am getting this error.
Any specific reason for this?

If you did the indexing in similar conditions then this is a bit surprising.

Here is the way I am doing the indexing,

A. To Fetch ~150 million records

curl -XPUT 'http://XXX.X.X.XXX:9200/_river/blk_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"strategy": "simple",
"autocommit": true,
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/KA",
"user" : "",
"password" : "
**",
"sql" : [
{
"statement" : "select id as "_id", p_id as "pid" ,s_id as "sid" ,d_name as "dname" from sohil"
}
],
"maxbulkactions":5000,
"maxconcurrrentbulkactions":1,
"index" : "blks",
"type" : "blk",
"type_mapping": {"blks" : {"properties" : {"_id":{"type":"long","store":"yes"},"dname":{"type":"string","store":"yes","index":"not_analyzed"},"pid":{"type":"long","store":"yes"},"sid":{"type":"long","store":"yes"}}}}

}

}'

B. To fetch delta at every run (Frequency :- every 6 hours)

curl -XPUT 'http://XXX.X.X.XXX:9200/_river/update_blk_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"strategy": "simple",
"autocommit": true,
"schedule" : "0 /6 * * * ?",
"driver" : "com.mysql.jdbc.Driver",
"url" : "jdbc:mysql://localhost:3306/KA",
"user" : "
",
"password" : "
***",
"maxbulkactions":5000,
"maxconcurrrentbulkactions":1,
"sql" :
[
{
"statement" : "select id as "_id", p_id as "pid" ,s_id as "sid" ,d_name as "dname" from sohil where id > XXX"
}
]
,
"index" : "blks",
"type" : "blk",
"type_mapping": {"blks" : {"properties" : {"_id":{"type":"long","store":"yes"},"dname":{"type":"string","store":"yes","index":"not_analyzed"},"pid":{"type":"long","store":"yes"},"sid":{"type":"long","store":"yes"}}}}

}

}'

Please let me know in case I am doing anything wrong.

I see that the rejected execution exception is due to search threadpool queue exceeding 1000. What kind of queries are running through the system?

Hi @Srinath_C,

There are three types of queries.

  1. Search query (through java client)
  2. Delete query(through elasticsearch, mark records of elasticsearch to delete)
  3. Insert query(Through scheduler)

I forgot to mention that there is one more scheduler, which runs every 5 hour and delete records from elasticsearch.

Below is the river configuration for deleting records from elasticsearch,

curl -XPUT 'http://XX.XXX.XXX.XX:9200/_river/delete_blklist_jdbc_river/_meta' -d '{
"type":"jdbc",
"jdbc":{
"strategy":"simple",
"schedule":"0 0 0/5 * * ?",
"autocommit":true,
"driver":"com.mysql.jdbc.Driver",
"url":"jdbc:mysql://XX.XXX.XXX.XX:3306/KA",
"user":"XXXX",
"password":"XXXXXX",
"maxbulkactions":5000,
"maxconcurrrentbulkactions":1,
"sql":"select id as "_id", "delete" as "_optype", "blks" as "_index", "blk" as "_type" from deleted_id_table"
}
}'

The message

org.elasticsearch.common.util.concurrent.EsRejectedExecutionException: rejected execution (queue capacity 1000) on org.elasticsearch.search.action.SearchServiceTransportAction$23@220dbcbd

is not directly related to JDBC river, because JDBC river does not execute search requests at all.

You should find out who is continuing to send search requests to your cluster. Also, you have to find if the search requests responded with useful results in reasonable time, or if they time out. If they time out, search requests can fill up the search queue quite soon, within seconds, even when the size is 1000. Most probably, cluster state has switched, or search can not continue due to cluster or index blocks, or other incidents.

Hi @jprante,

Thanks for your answer. I will try to find out the root cause.
Is there any way to recover from such type of exception?

Thanks,
Sohil

Hi,

I have identified and stopped the one who was continuously sending the search request.
After that I have rebuild the elasticsearch index, but after two days I started getting the same exception.
Here is the output of cluster and index status http://pastebin.com/c56wZmEf
It will be great if someone can help me to resolve this.

You do not need to rebuild the index only because of a search exception.

The information you give is not sufficient.

You should post a copy of the exception you get now, the configuration of your cluster, of your index, and what the identified search request looks like.

Hi @jprante,

Here is the exception which I got now http://pastebin.com/xwAEZDqn
When I hit normal search call, I got
curl -XGET 'XX.XXX.XXX.XX:9200/blks/_search?pretty&q=dname:2010.census.gov'
{
"error" : "SearchPhaseExecutionException[Failed to execute phase [query], all shards failed; shardFailures {[-y2z8tICS_6mywTPw77x2A][blks][0]: EsRejectedExecutionException[rejected execution (queue capacity 1000) on org.elasticsearch.search.action.SearchServiceTransportAction$23@49cb7fbc]}{[-y2z8tICS_6mywTPw77x2A][blks][1]: EsRejectedExecutionException[rejected execution (queue capacity 1000) on org.elasticsearch.search.action.SearchServiceTransportAction$23@4e781ac0]}{[-y2z8tICS_6mywTPw77x2A][blks][2]: EsRejectedExecutionException[rejected execution (queue capacity 1000) on org.elasticsearch.search.action.SearchServiceTransportAction$23@7bf30706]}{[-y2z8tICS_6mywTPw77x2A][blks][3]: EsRejectedExecutionException[rejected execution (queue capacity 1000) on org.elasticsearch.search.action.SearchServiceTransportAction$23@eb2ccea]}{[-y2z8tICS_6mywTPw77x2A][blks][4]: EsRejectedExecutionException[rejected execution (queue capacity 1000) on org.elasticsearch.search.action.SearchServiceTransportAction$23@464adf9a]}]",
"status" : 429
}

Can you please point me to the API through which I can get information about the configuration of cluster, of index ?

Regarding search request, do you want the actual search query which I am hitting to fetch data from ES?

Thanks,
Sohil

Hi @jprante,

I am getting the same exception after every two days.
Is it because of only single elastic search node?

You mean you only use a single node???

From the logs you published at ElasticSearch Exception log file - Pastebin.com it is quite clear your cluster runs out of resources. You need to ramp up more resources or you should add more nodes to your cluster to get more heap and CPU power.

Hi @jprante,

I am using only one elasticsearch node.

My mapping is

{
"blks": {
"mappings": {
"blk": {
"properties": {
"dname": {
"type": "string"
},
"pid": {
"type": "long"
},
"sid": {
"type": "long"
}
}
}
}
}
}

and my dname field is notanalyzed.

Below are only two types of query which I am executing on my elasticsearch node.

  1. Search on dname field :- (exact match) OR (ends with)

[2015-09-07 22:56:19,554][TRACE][index.search.slowlog.query] [Plug] [blks][0] took[39.6ms], took_millis[39], types[blk], stats[], search_type[DFS_QUERY_THEN_FETCH], total_shards[5], source[{"from":0,"size":750000,"query":{"bool":{"must":{"query_string":{"query":"abc.com","fields":["dname"]}},"should":{"query_string":{"query":"\*.abc.com","fields":["dname"]}}}},"post_filter":{"or":{"filters":[]}}}], extra_source[],

  1. Search on pid and sid field :-

Query :- pid=X AND sid=Y

[2015-09-07 22:56:19,554][TRACE][index.search.slowlog.query] [Plug] [blks][1] took[7.1ms], took_millis[7], types[blk], stats[], search_type[DFS_QUERY_THEN_FETCH], total_shards[5], source[{"from":0,"size":750000,"query":{"bool":{"must":[{"query_string":{"query":"504","fields":["pid"]}},{"query_string":{"query":"0","fields":["sid"]}}]}}}], extra_source[],

Please let me know in case you need more details.
Currently I am using elasticsearch-1.7.1 on production.
Your suggestion are always welcome.

@sohilelasticsearch did you try installing marvel and looking into the queue size and other parameters? Definitely worth the try.

You could also try adjusting "maxbulkactions":5000 and maxconcurrrentbulkactions":1 parameters.

@Srinath_C I have already set "maxbulkactions":5000 and maxconcurrrentbulkactions":1.

Elasticsearch is not good at executing queries that you make. I have few ideas that can improve performance:

  1. Do not query for 750000 rows. Use search/scroll request. If order (score sorting) is not required, use scan/scroll request.
  2. Again, if score sorting is not required, use "term filter" to run exact match. Wrap it in filtered query to put in search request.
  3. Wildcard suffix search is expensive operation. I suggest you to use path hierarchy tokenizer with reverse order and '.' as a separator. With this settings you could run term filter/query to match your domains.