From hive to es: Could not write all entries[832/832] (maybe es was overloaded?)

My hive has more than 30 nodes, and my table's space is almost 140GB, another, my elasticsearch cluster ( 8 data nodes with 8 cores/16G memory) is isolated from the hive. Now,
I want to load data from hive into es according Apache Hive integration.

The following is my hiveQL script:

add jar elasticsearch-hadoop-5.2.2.jar;

drop table database_X.artists;
CREATE EXTERNAL TABLE database_X.artists(
user_id string,
province int ,
...
col34 string)
stored by 'org.elasticsearch.hadoop.hive.EsStorageHandler'
tblproperties('es.resource' = 'dillon_pengcz/artists', 'es.nodes' = '172.21.8.24', 'es.index.auto.create' = 'true', 'es.mapping.id'='caa_id', 'es.batch.size.entries'='0', 'es.batch.size.bytes' = '8mb');

insert overwrite table database_X.artists select * from database_X.artists_src;

When I executed the above script, I got the errors:

Number of reduce tasks is set to 0 since there's no reduce operator

Starting Job = job_1489400630906_38677, Tracking URL = http://nn1.bitauto.dmp:8088/proxy/application_1489400630906_38677/
Kill Command = /opt/cloudera/parcels/CDH-5.5.1-1.cdh5.5.1.p0.11/lib/hadoop/bin/hadoop job  -kill job_1489400630906_38677
Hadoop job information for Stage-0: number of mappers: 603; number of reducers: 0
2017-03-28 16:21:44,824 Stage-0 map = 0%,  reduce = 0%
2017-03-28 16:22:45,755 Stage-0 map = 0%,  reduce = 0%, Cumulative CPU 1747.49 sec

...
...

Caused by: org.elasticsearch.hadoop.EsHadoopException: Could not write all entries [832/832] (Maybe ES was overloaded?). Error sample (first [5] error messages):
	rejected execution of org.elasticsearch.transport.TransportService$7@40f918fc on EsThreadPoolExecutor[bulk, queue capacity = 50, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@242066f4[Running, pool size = 8, active threads = 8, queued tasks = 50, completed tasks = 17811011]]
	rejected execution of org.elasticsearch.transport.TransportService$7@67c6690b on EsThreadPoolExecutor[bulk, queue capacity = 50, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@242066f4[Running, pool size = 8, active threads = 8, queued tasks = 50, completed tasks = 17811011]]
	rejected execution of org.elasticsearch.transport.TransportService$7@3ae19c40 on EsThreadPoolExecutor[bulk, queue capacity = 50, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@242066f4[Running, pool size = 8, active threads = 8, queued tasks = 50, completed tasks = 17811011]]
	rejected execution of org.elasticsearch.transport.TransportService$7@65dc5ac6 on EsThreadPoolExecutor[bulk, queue capacity = 50, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@242066f4[Running, pool size = 8, active threads = 8, queued tasks = 50, completed tasks = 17811011]]
	rejected execution of org.elasticsearch.transport.TransportService$7@65dc5ac6 on EsThreadPoolExecutor[bulk, queue capacity = 50, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@242066f4[Running, pool size = 8, active threads = 8, queued tasks = 50, completed tasks = 17811011]]

what should I do to check the reason? Any advice will be appreciated!Preformatted text

It says here that you're using 603 mappers to write to Elasticsearch, but Elasticsearch's bulk queue is overwhelmed with indexing requests. 832 entries per bulk does not seem like very much. If you configure your hive query to use a combined input format to lower the number of splits on the job then that would give ES larger and fewer batches of records, and fill up its task queue less frequently.

Thank you James Baiera!

According to your reply:

I inserted a configure before the statement insert... as follows:

set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
insert overwrite table database_X.artists select * from database_X.artists_src;

And I found the changed log relative to CombineHiveInputFormat

[main] INFO org.apache.hadoop.hive.ql.io.CombineHiveInputFormat - number of splits 618
[main] INFO org.apache.hadoop.hive.ql.io.CombineHiveInputFormat - Number of all splits 618

But the same error exists, I don't know whether I used CombineHiveInputFormat wrong?

And I looked at the error in more detail now, there are many different number of entries as follows:

  1. Could not write all entries [222/222] (Maybe ES was overloaded?). Error sample (first [5] error messages):
  2. Could not write all entries [425/425] (Maybe ES was overloaded?). Error sample (first [5] error messages):
  3. Could not write all entries [181/181] (Maybe ES was overloaded?). Error sample (first [5] error messages):
  4. Could not write all entries [224/224] (Maybe ES was overloaded?). Error sample (first [5] error messages):
    ...

First I am so sorry, I will ask no appropriate questions :slight_smile:
The following is my understanding based on , this time, the number of mappers( and splits ) is 618

Bulk queue size is 50, right?
How many indexing requests? Can I monitor it or check it?

Of course, this time, they are 222, 425, 181,224, How many entries per split(or mapper)? Why the number is such small, accord to the space of source table:

hive> dfs -du -s -h hdfs://.../database_X/artists_src;
165.2 G 495.7 G hdfs://.../database_X/artists_src

the space per split is almost: 165.2 * 1024 M / 618 = 274 MBytes
I know I must miss something

"task queue" here you mean mapper or bulk?

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.