Not able to load data from hive to Elasticsearch using ESStorage Handler

I am picking up data from AWS in a hive table and I am overwriting another ES-Hive table as per the steps mentioned in ES-hadoop integration steps:- https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html

I am following steps as under:

CREATE EXTERNAL TABLE IF NOT EXISTS abc (schema) ROW FORMAT SERDE 'org.elasticsearch.hadoop.hive.EsSerDe' STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler' 
TBLPROPERTIES('es.resource' = 'pp/pp','network.bind_host'='ip of master es node' ,'es.nodes'='ip-10-20-***-152', 'es.port'='****','es.nodes.wan.only' = 'true', 'es.batch.write.retry.count'='-1','es.batch.write.retry.wait'='2','es.bulk.size.bytes'='50','es.bulk.size.entries'='200','es.index.auto.create' = 'true','es.write.rest.error.handler.log.logger.level'='ERROR','es.write.rest.error.handlers'='log',
'es.write.rest.error.handler.log.logger.name'='BulkErrors');


CREATE EXTERNAL TABLE IF NOT EXISTS xyz (schema)row format delimited fields terminated by '|'  LOCATION 'aws location path';


insert into abc select * from xyz;

My cluster specifications are as under:
number of nodes: 5, (1 master, 4 datanodes)
cores: 8 cores
RAM: 16GB
Storage as per each node: Master- 1TB,
2 data nodes- 1 TB, and other 2 data nodes:- 100gb each.

When I try to execute the code, it neither shows me error in the logs, nor does it completes the tez process. All the process keeps under "running" and goes into failed jobs which clearly says elasticsearch is rejecting the write requests. I could find nothing under elasticsearch logs. Could anyone please help me in resolving this.
I need to load 455GB of data and later the data would increase to 25TB.

I moved this over into the hadoop category to make sure the right persons are reading this.

1 Like

Since you're running Hive in AWS, you might want to make sure that Hive workers are able to reach Elasticsearch. Generally, the task executors are on their own private network, and external services like Elasticsearch need to be explicitly whitelisted for traffic.

If you can find a way to increase the logging level of Hive and post the logs here, we might be able to help more.

@james.baiera Thanks for your response. I was able to load 17 GB of data perfectly by the same queries I have mentioned, so I guess Hive workers are able to reach ElasticSearch.
Now when I try to load 433 GB of data, the jobs go to "failed state" and then it gives me "org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed; tried".
There is something wrong with the elastic search cluster, however I have given enough storage to each node in order to load 433gb of data. Could you please help me figure out this.
TIA

@krishnaSH I would make sure you aren't seeing any rejections or timeouts in the logs. Do you have any more logs available to share?

I am seeing a rejection after 182gb got into my index.
My rejection is :

org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed

This is a generic message that there was a failure to communicate to all nodes that the connector is aware of. Can you increase the logging level and post the complete job log here to display the reasons for the connection errors?

Surely, here are my logs:-

[2018-04-18T07:22:56,753][DEBUG][o.e.a.a.i.m.p.TransportPutMappingAction] [es-master-01] failed to put mappings on indices [[[patientproblem1million/W81Vp0sKToeBOwdUWwYakw]]], type [test_type]
java.lang.IllegalArgumentException: mapper [PatientProblemCode] of different type, current_type [long], merged_type [text]
	at org.elasticsearch.index.mapper.FieldMapper.doMerge(FieldMapper.java:354) ~[elasticsearch-6.1.2.jar:6.1.2]
	at org.elasticsearch.index.mapper.NumberFieldMapper.doMerge(NumberFieldMapper.java:1038) ~[elasticsearch-6.1.2.jar:6.1.2]
	at org.elasticsearch.index.mapper.FieldMapper.merge(FieldMapper.java:340) ~[elasticsearch-6.1.2.jar:6.1.2]
	
[2018-04-18T07:22:56,775][DEBUG][o.e.a.a.i.m.p.TransportPutMappingAction] [es-master-01] failed to put mappings on indices [[[patientproblem1million/W81Vp0sKToeBOwdUWwYakw]]], type [test_type]
java.lang.IllegalArgumentException: mapper [PatientProblemCode] of different type, current_type [long], merged_type [text]
	...
	at org.elasticsearch.index.mapper.ObjectMapper.doMerge(ObjectMapper.java:477) ~[elasticsearch-6.1.2.jar:6.1.2]
	at org..
[2018-04-18T07:22:56,775][DEBUG][o.e.a.a.i.m.p.TransportPutMappingAction] [es-master-01] failed to put mappings on indices [[[patientproblem1million/W81Vp0sKToeBOwdUWwYakw]]], type [test_type]
java.lang.IllegalArgumentException: mapper [PatientProblemCode] of different type, current_type [long], merged_type [text]
	at org.elasticsearch.index.mapper.FieldMapper.doMerge(FieldMapper.java:354) ~[elasticsearch-6.1.2.jar:6.1.2]
...
[2018-04-18T07:23:06,831][INFO ][o.e.m.j.JvmGcMonitorService] [es-master-01] [gc][8389] overhead, spent [421ms] collecting in the last [1s]
[2018-04-18T07:23:07,906][INFO ][o.e.m.j.JvmGcMonitorService] [es-master-01] [gc][8390] overhead, spent [312ms] collecting in the last [1s]
[2018-04-18T07:23:09,907][INFO ][o.e.m.j.JvmGcMonitorService] [es-master-01] [gc][8392] overhead, spent [337ms] collecting in the last [1s]
[2018-04-18T07:24:20,301][INFO ][o.e.c.m.MetaDataDeleteIndexService] [es-master-01] [patientproblem1million/W81Vp0sKToeBOwdUWwYakw] deleting index
[2018-04-18T07:47:21,797][INFO ][o.e.c.r.a.DiskThresholdMonitor] [es-master-01] low disk watermark [85%] exceeded on [bPlGIe5sSfi3AlNUAqBGng][es-dats][/home/ec2-user/figmd/elasticsearch-6.1.2/data/nodes/0] free: 14.3gb[14.5%], replicas will not be assigned to this node
[2018-04-18T07:47:51,806][INFO ][o.e.c.r.a.DiskThresholdMonitor] [es-master-01] low disk watermark [85%] exceeded on [bPlGIe5sSfi3AlNUAqBGng][es-dats][/home/ec2-user/figmd/elasticsearch-6.1.2/data/nodes/0] free: 13.8gb[14%], replicas will not be assigned to this node
[2018-04-18T07:48:21,816][INFO ][o.e.c.r.a.DiskThresholdMonitor] [es-master-01] low disk watermark [85%] exceeded on [bPlGIe5sSfi3AlNUAqBGng][es-dats][/home/ec2-user/figm/elasticsearch-6.1.2/data/nodes/0] free: 9.4gb[9.6%], shards will be relocated away from this node
[2018-04-18T07:49:51,850][INFO ][o.e.c.r.a.DiskThresholdMonitor] [es-master-01] rerouting shards: [high disk watermark exceeded on one or more nodes]e
[2018-04-18T07:52:51,922][WARN ][o.e.c.r.a.DiskThresholdMonitor] [es-master-01] high disk failed for shard id [[es][0]], allocation id [adO3yrpvTmK6XAe2bAO7oQ], primary term [0], message [shard failure, reason [merge failed]], failure [NotSerializableExceptionWrapper[merge_exception: java.io.IOException: No space left on device]; nested: IOException[No space left on device]; ]
org.elasticsearch.common.io.stream.NotSerializableExceptionWrapper: merge_exception: java.io.IOException: No space left on device
	at org.elasticsearch.index.engine.InternalEngine$EngineMergeScheduler$2.doRun(InternalEngine.java:2015) ~[elasticsearch-6.1.2.jar:6.1.2]
	at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:637) ~[elasticsearch-6.1.2.jar:6.1.2]
	at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) ~[elasticsearch-6.1.2.jar:6.1.2]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_152]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_152]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_152]
Caused by: java.io.IOException: No space left on device
	at sun.nio.ch.FileDispatcherImpl.write0(Native Method) ~[?:?]
	at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60) ~[?:?]
...
at org.apache.lucene.index.ConcurrentMergeScheduler$MergeThread.run(ConcurrentMergeScheduler.java:661) ~[lucene-core-7.1.0.jar:7.1.0 84c90ad2c0218156c840e19a64d72b8a38550659 - ubuntu - 2017-10-13 16:12:42]
	Suppressed: java.io.IOException: No space left on device
		at sun.nio.ch.FileDispatcherImpl.write0(Native Method) ~[?:?]
		at org.apache.lucene.store.FSDirectory$FSIndexOutput$1.write(FSDirectory.java:419) ~[lucene-core-7.1.0.jar:7.1.0 84c90ad2c0218156c840e19a64d72b8a38550659 - ubuntu - 2017-10-13 16:12:42]
		at java.util.zip.CheckedOutputStream.write(CheckedOutputStream.java:73) ~[?:1.8.0_152]
		at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) ~[?:1.8.0_152]
		at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) ~[?:1.8.0_152]
		at org.apache.lucene.store.OutputStreamIndexOutput.close(OutputStreamIndexOutput.java:68) ~[lucene-core-7.1.0.jar:7.1.0 84c90ad2c0218156c840e19a64d72b8a38550659 - ubuntu - 2017-10-13 16:12:42]
..
		Suppressed: java.io.IOException: No space left on device
			...
           MergeScheduler.java:99) ~[elasticsearch-6.1.2.jar:6.1.2]
			at org.apache.lucene.index.ConcurrentMergeScheduler$MergeThread.run(ConcurrentMergeScheduler.java:661) ~[lucene-core-7.1.0.jar:7.1.0 84c90ad2c0218156c840e19a64d72b8a38550659 - ubuntu - 2017-10-13 16:12:42]
[2018-04-18T07:58:32,083][INFO ][o.e.c.r.a.AllocationService] [es-master-01] Cluster health status changed from [GREEN] to [RED] (reason: [shards failed [[es][0]] ...]).

Below are the logs for a different index when encountered same issue:-

[2018-04-18T07:21:53,756][INFO ][o.e.c.m.MetaDataMappingService] [es-master-01] [patientproblem1million/er_hBJwiR86MePF3BY2Q1w] create_mapping [test_type]
[2018-04-18T07:21:53,769][DEBUG][o.e.a.a.i.m.p.TransportPutMappingAction] [es-master-01] failed to put mappings on indices [[[patientproblem1million/er_hBJwiR86MePF3BY2Q1w]]], type [test_type]
java.lang.IllegalArgumentException: mapper [PatientProblemCode] of different type, current_type [text], merged_type [long]
	at org.elasticsearch.index.mapper.FieldMapper.doMerge(FieldMapper.java:354) ~[elasticsearch-6.1.2.jar:6.1.2]
	at org.elasticsearch.index.mapper.TextFieldMapper.doMerge(TextFieldMapper.java:372) ~[elasticsearch-6.1.2.jar:6.1.2]
	at org.elasticsearch.index.mapper.FieldMapper.merge(FieldMapper.java:340) ~[elasticsearch-6.1.2.jar:6.1.2]
	at org.elasticsearch.index.mapper.FieldMapper.merge(FieldMapper.java:52) ~[elasticsearch-6.1.2.jar:6.1.2]
	at org.elasticsearch.index.mapper.ObjectMapper.doMerge(ObjectMapper.java:477) ~[elasticsearch-6.1.2.jar:6.1.2]
	at org.elasticsearch.index.mapper.RootObjectMapper.doMerge(RootObjectMapper.java:277) ~[elasticsearch-6.1.2.jar:6.1.2]
	at org.elasticsearch.index.mapper.ObjectMapper.merge(ObjectMapper.java:449) ~[elasticsearch-6.1.2.jar:6.1.2]
	at org.elasticsearch.index.mapper.RootObjectMapper.merge(RootObjectMapper.java:272) ~[elasticsearch-6.1.2.jar:6.1.2]
	at org.elasticsearch.index.mapper.Mapping.merge(Mapping.java:89) ~[elasticsearch-6.1.2.jar:6.1.2]
	at org.elasticsearch.index.mapper.DocumentMapper.merge(DocumentMapper.java:304) ~[elasticsearch-6.1.2.jar:6.1.2]
	at org.elasticsearch.cluster.metadata.MetaDataMappingService$PutMappingExecutor.applyRequest(MetaDataMappingService.java:267) ~[elasticsearch-6.1.2.jar:6.1.2]
	at org.elasticsearch.cluster.metadata.MetaDataMappingService$PutMappingExecutor.execute(MetaDataMappingService.java:230) ~[elasticsearch-6.1.2.jar:6.1.2]
	at org.elasticsearch.cluster.service.MasterService.executeTasks(MasterService.java:640) ~[elasticsearch-6.1.2.jar:6.1.2]
	at org.elasticsearch.cluster.service.MasterService.calculateTaskOutputs(MasterService.java:270) ~[elasticsearch-6.1.2.jar:6.1.2]
	at org.elasticsearch.cluster.service.MasterService.runTasks(MasterService.java:195) [elasticsearch-6.1.2.jar:6.1.2]
	at org.elasticsearch.cluster.service.MasterService$Batcher.run(MasterService.java:130) [elasticsearch-6.1.2.jar:6.1.2]
	at org.elasticsearch.cluster.service.TaskBatcher.runIfNotProcessed(TaskBatcher.java:150) [elasticsearch-6.1.2.jar:6.1.2]
	at org.elasticsearch.cluster.service.TaskBatcher$BatchedTask.run(TaskBatcher.java:188) [elasticsearch-6.1.2.jar:6.1.2]
	at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:568) [elasticsearch-6.1.2.jar:6.1.2]
	at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:247) [elasticsearch-6.1.2.jar:6.1.2]
	at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:210) [elasticsearch-6.1.2.jar:6.1.2]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_152]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_152]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_152]

So it looks like you have a multitude of different problems here, biggest of all is that in your first message there you are running out of storage space for Elasticsearch.

From your second message: It also looks like you have inconsistent mappings of data in your fields, or an unrelated client is submitting erroneous data.

@james.baiera I see.

  1. But looking at the current state of storage by htop & top command on the nodes, I see that there is still enough space for the 433gb data. I can try increasing the number of nodes though. What do you think ?
  2. Also, my data is all unstructured, it consists of patient health notes, so do you think it is causing any issue ?
    I came across an open source plugin - embulk which uploads csv directly to ES as bulk upload.Giving it a try. Lets see how it goes.

There could be any number of issues with the disk that may be causing Elasticsearch to think it's running out of space. I would reach out to your cloud providers if you are running in the cloud to see if they see anything wrong with those nodes.

Unstructured data isn't a really a problem unless there are fields that have conflicting data types. Elasticsearch is making a lot of decisions at index time to make sure searching is fast as possible. It's because of this that it is a best practice to make sure you have a reasonable mapping figured out when building a solution, and to ensure that your indices are created with those mappings, either ahead of time or via an index template.

Additionally, a little bit of data cleansing goes a long way in preventing problems before they become real issues. I would make sure that all of your data fits with your mappings before attempting to index it into Elasticsearch.

1 Like

@james.baiera Thanks a ton for this very helpful information. I did run the job with hive-elasticsearch integration and applied the property for encoding to handle "encoding-UTF8" errors and job ran without any errors in the logs.
Now the job keeps running and did not stop.

  1. My data is 433 gb with 532 million records in the csv.
    And the job ran for around 30 hours and thereafter showed the status as:

size of data in index in ES was : 478gb,
Number of records: 638363606.

After it exceeded the total data size, it still kept running, and I had to manually stop it due to work place constraints. Now that my storage size is total 1 TB, it also had the possibility to crash elasticsearch cluster if kept unmonitored for the long time.

  1. Also, when I check the next morning, the data in index decreases to 504gb. I have noticed this many times, that data decreases abruptly. Is the ES internally balancing the load ? But then how would it affect the whole index data size ?

Would be grateful if you could throw some light on why this happens, and do you think I should keep it running ?

For anyone else who is facing the same issue, I resolved this exception by mentioning the property to handle the UTF-8 Encoding.
The property that I applied in the Hive table was: 'serialization.encoding'='ISO88591'
Once this was applied, My data went into elasticsearch nicely and took around 6-7 hours. :slight_smile:

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.