Writing to Elasticsearch from HDFS using Map/Reduce

I'm trying to write some files, which are stored on HDFS, to ElasticSearch by using hadoop map reduce. I have one mapper and no reducers and the files are in JSON format.

When I run my code, 800 reducers starts running and when they reach 84%, the job is failed and I got the error:
Error: org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed; tried ...

However, when I use "conf.setNumReduceTasks(0)" in my java code, the mapping does not proceed at all and it stocked at 0% with error:
Error: org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: Found unrecoverable error [...] returned Internal Server Error(500) - [RemoteTransportException[[...][inet[]][indices:data/write/bulk[s]]]; nested: NullPointerException; ]; Bailing out..

I used the following settings for my JobConf:

                conf.addResource(...);
		conf.setMapperClass(Map.class);
		conf.setMapOutputValueClass(Text.class);
		conf.setMapOutputKeyClass(Text.class);
		conf.setSpeculativeExecution(false);
		conf.setNumMapTasks(1);
		conf.set("es.nodes", ES_NODES);
                conf.set("es.resource.write", "...");
                conf.set("mapred.output.compress", "true");
                //conf.setNumReduceTasks(0);
		
		
		 //es
		 conf.set("es.input.json", "yes");
		 conf.set("es.write.operation", "index");
		 conf.set("es.index.auto.create", "yes");
		 conf.set("es.field.read.validate.presence", "warn");
		 conf.set("es.batch.write.retry.count", "10");
		 conf.setOutputFormat(EsOutputFormat.class);

Can somebody please tell me what other things I should set for my configuration to avoid these errors?

Thanks! :slight_smile:

The RemoteTransportException followed by the NPE is conspicuous - looks like a bug inside ES itself; can you please provide more information about that message/log and the version of ES used.

As for the initial problem - it is highly likely that you are overloading ES and the job eventually fails as the nodes become unresponsive or because there are too many retries.
You could try to either limit the number of reducers or use something like CombineTextInputFormat to basically combine the inputs/splits in Hadoop and thus reduce the number of mappers (and thus the resulting number of reducers) in your job.

1 Like