Problem when writing to elasticsearch using ES-Hadoop

Hi all,
Am getting this exception when I'm trying to write to Elasticsearch using mapreduce program with es-hadoop. Am trying to write to index=employee and type=basic which already exists in my Elasticsearch cluster.

My stack trace :-
Exception in thread "main" org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: No resource ['es.resource'] (index/query/location) specified
at org.elasticsearch.hadoop.util.Assert.hasText(Assert.java:30)
at org.elasticsearch.hadoop.mr.EsOutputFormat.init(EsOutputFormat.java:257)
at org.elasticsearch.hadoop.mr.EsOutputFormat.checkOutputSpecs(EsOutputFormat.java:233)
at org.apache.hadoop.mapreduce.JobSubmitter.checkSpecs(JobSubmitter.java:266)
at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:139)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1290)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1287)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
at org.apache.hadoop.mapreduce.Job.submit(Job.java:1287)
at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1308)
at com.mstack.mapreduce.DIGDriver.main(DIGDriver.java:22)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
at org.apache.hadoop.util.RunJar.main(RunJar.java:136)

My driver class :-
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "es-hadoop");
job.setJarByClass(DIGDriver.class);
conf.set("es.nodes", "localhost:9200");
conf.set("es.port", "9200");
conf.set("es.resource", "employee/basic");
job.setNumReduceTasks(0);
job.setOutputFormatClass(EsOutputFormat.class);
job.setMapperClass(DIGMapper.class);
job.setMapOutputValueClass(MapWritable.class);
conf.setBoolean("mapreduce.map.speculative", false);
conf.setBoolean("mapreduce.reduce.speculative", false);
boolean status = job.waitForCompletion(true);
if (status) {
System.exit(0);
} else {
System.out.println("Job Failed : Some error!");
System.exit(1);
}

Please help me on this.

You are not initializing the configuration correctly. From the javadocs (Job#getInstance):

``
Creates a new Job with no particular Cluster and a
given Configuration.

The Job makes a copy of the Configuration so
that any necessary internal modifications do not reflect on the incoming
parameter.
``

The configuration fully before creating the job instance. Note that there are several other gotchas like this around Configuration so always check the javadocs and the sources.
P.S. Formatting helps