Unable to write existing json from HDFS to elasticsearch using MapReduce

Well, I try to write exist json from hdfs to elasticsearch but is raising error... Have any one met such a problem,help me.
Thank you in advance.

Error: org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: Found unrecoverable error [10.0.0.12:9200] returned Bad Request(400) - failed to parse, document is empty; Bailing out..
at org.elasticsearch.hadoop.rest.RestClient.processBulkResponse(RestClient.java:251)
at org.elasticsearch.hadoop.rest.RestClient.bulk(RestClient.java:203)
at org.elasticsearch.hadoop.rest.RestRepository.tryFlush(RestRepository.java:220)
at org.elasticsearch.hadoop.rest.RestRepository.flush(RestRepository.java:242)
at org.elasticsearch.hadoop.rest.RestRepository.close(RestRepository.java:267)
at org.elasticsearch.hadoop.mr.EsOutputFormat$EsRecordWriter.doClose(EsOutputFormat.java:214)
at org.elasticsearch.hadoop.mr.EsOutputFormat$EsRecordWriter.close(EsOutputFormat.java:196)
at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.close(ReduceTask.java:550)
at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:629)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:389)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)

17/05/10 22:59:58 INFO mapreduce.Job: Task Id : attempt_1494321840981_0039_r_000000_1, Status : FAILED
Error: org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: Found unrecoverable error [10.0.0.12:9200] returned Bad Request(400) - failed to parse, document is empty; Bailing out..
at org.elasticsearch.hadoop.rest.RestClient.processBulkResponse(RestClient.java:251)
at org.elasticsearch.hadoop.rest.RestClient.bulk(RestClient.java:203)
at org.elasticsearch.hadoop.rest.RestRepository.tryFlush(RestRepository.java:220)
at org.elasticsearch.hadoop.rest.RestRepository.flush(RestRepository.java:242)
at org.elasticsearch.hadoop.rest.RestRepository.close(RestRepository.java:267)
at org.elasticsearch.hadoop.mr.EsOutputFormat$EsRecordWriter.doClose(EsOutputFormat.java:214)
at org.elasticsearch.hadoop.mr.EsOutputFormat$EsRecordWriter.close(EsOutputFormat.java:196)
at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.close(ReduceTask.java:550)
at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:629)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:389)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)

17/05/10 23:00:05 INFO mapreduce.Job: Task Id : attempt_1494321840981_0039_r_000000_2, Status : FAILED
Error: org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: Found unrecoverable error [10.0.0.15:9200] returned Bad Request(400) - failed to parse, document is empty; Bailing out..
at org.elasticsearch.hadoop.rest.RestClient.processBulkResponse(RestClient.java:251)
at org.elasticsearch.hadoop.rest.RestClient.bulk(RestClient.java:203)
at org.elasticsearch.hadoop.rest.RestRepository.tryFlush(RestRepository.java:220)
at org.elasticsearch.hadoop.rest.RestRepository.flush(RestRepository.java:242)
at org.elasticsearch.hadoop.rest.RestRepository.close(RestRepository.java:267)
at org.elasticsearch.hadoop.mr.EsOutputFormat$EsRecordWriter.doClose(EsOutputFormat.java:214)
at org.elasticsearch.hadoop.mr.EsOutputFormat$EsRecordWriter.close(EsOutputFormat.java:196)
at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.close(ReduceTask.java:550)
at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:629)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:389)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)

17/05/10 23:00:12 INFO mapreduce.Job: map 100% reduce 100%
17/05/10 23:00:12 INFO mapreduce.Job: Job job_1494321840981_0039 failed with state FAILED due to: Task failed task_1494321840981_0039_r_000000
Job failed as tasks failed. failedMaps:0 failedReduces:1

17/05/10 23:00:13 INFO mapreduce.Job: Counters: 37
File System Counters
FILE: Number of bytes read=0
FILE: Number of bytes written=106205
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=574
HDFS: Number of bytes written=0
HDFS: Number of read operations=2
HDFS: Number of large read operations=0
HDFS: Number of write operations=0
Job Counters
Failed reduce tasks=4
Launched map tasks=1
Launched reduce tasks=4
Rack-local map tasks=1
Total time spent by all maps in occupied slots (ms)=7324
Total time spent by all reduces in occupied slots (ms)=31992
Total time spent by all map tasks (ms)=3662
Total time spent by all reduce tasks (ms)=15996
Total vcore-seconds taken by all map tasks=3662
Total vcore-seconds taken by all reduce tasks=15996
Total megabyte-seconds taken by all map tasks=14999552
Total megabyte-seconds taken by all reduce tasks=65519616
Map-Reduce Framework
Map input records=6
Map output records=6
Map output bytes=491
Map output materialized bytes=509
Input split bytes=101
Combine input records=0
Spilled Records=6
Failed Shuffles=0
Merged Map outputs=0
GC time elapsed (ms)=26
CPU time spent (ms)=880
Physical memory (bytes) snapshot=341835776
Virtual memory (bytes) snapshot=2891649024
Total committed heap usage (bytes)=1010302976
File Input Format Counters
Bytes Read=473

Version Info

OS: : Ubuntu 14.04
JVM : 1.7
Hadoop/Spark: Hadoop-2.6.0
ES-Hadoop : 5.4.0
ES : 5.0.1

code as follows.

package com.javacore.hadoop;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.elasticsearch.hadoop.mr.EsOutputFormat;

import java.io.IOException;

public class HdfsToES {

public static class MyMapper extends Mapper<Object, Text, NullWritable,
        BytesWritable> {

    public void map(Object key, Text value, Mapper<Object, Text,
            NullWritable, BytesWritable>.Context context) throws IOException, InterruptedException {
        byte[] line = value.toString().trim().getBytes();
        BytesWritable blog = new BytesWritable(line);
        context.write(NullWritable.get(), blog);
    }
}


public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

    Configuration conf = new Configuration();
    conf.setBoolean("mapred.map.tasks.speculative.execution", false);
    conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
    conf.set("es.nodes", "10.0.0.10:9200");
    conf.set("es.resource", "blog/csdn");
    conf.set("es.mapping.id", "id");
    conf.set("es.input.json", "yes");

    Job job = Job.getInstance(conf, "hadoop es write test");
    job.setMapperClass(HdfsToES.MyMapper.class);
    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(EsOutputFormat.class);

    job.setMapOutputKeyClass(NullWritable.class);
    job.setMapOutputValueClass(BytesWritable.class);
    job.setJarByClass(HdfsToES.class);
    FileInputFormat.setInputPaths(job, new Path
            ("hdfs://10.0.0.13:9000/work/blog.json"));

    job.waitForCompletion(true);
}

}

My json data also simple. Please forgive me for my poor English,Thank you in advance.

{"id":"1","title":"git","posttime":"2017-05-01","content":"The main difference between SVN and Git"}
{"id":"2","title":"ava","posttime":"2017-05-02","content":"Basic operation: CRUD ..."}
{"id":"3","title":"SQL","posttime":"2017-05-03","content":"SQL."}
{"id":"4","title":"Hibernate","posttime":"2017-05-04","content":"Hibernate"}
{"id":"5","title":"Shell","posttime":"2017-05-05","content":"What is Shell."}

My friend has used hadoop verision in 2.7.0 and elasticsearch in 2.3 and ES-elasticsearch in 2.3 can run perfectly,but met the same problem when he used ES-elasticsearch in 5.4.0. So I initially identified this is the ES-elasticsearch version of bug caused failed to parse JSON format。But i don't want to change my es version. Please , please help me!

Also it report this error:

17/05/11 11:58:24 INFO mapreduce.Job: Task Id : attempt_1494321840981_0058_r_000000_1, Status : FAILED
Error: org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: Found unrecoverable error [10.0.0.12:9200] returned Bad Request(400) - failed to parse, document is empty; Bailing out..
at org.elasticsearch.hadoop.rest.RestClient.processBulkResponse(RestClient.java:251)
at org.elasticsearch.hadoop.rest.RestClient.bulk(RestClient.java:203)
at org.elasticsearch.hadoop.rest.RestRepository.tryFlush(RestRepository.java:220)
at org.elasticsearch.hadoop.rest.RestRepository.flush(RestRepository.java:242)
at org.elasticsearch.hadoop.rest.RestRepository.close(RestRepository.java:267)
at org.elasticsearch.hadoop.mr.EsOutputFormat$EsRecordWriter.doClose(EsOutputFormat.java:214)
at org.elasticsearch.hadoop.mr.EsOutputFormat$EsRecordWriter.close(EsOutputFormat.java:196)
at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.close(ReduceTask.java:550)
at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:629)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:389)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.