How to use MR to import nested data into ES?

My Trying:
public class Hadoop2ElasticsearchWithJSON extends Configured implements Tool {

public static void main(String args) throws Exception {
int res = Configuration(), new Hadoop2ElasticsearchWithJSON(), args);


  • Map-reduce program for importing Tweet User exampler data.
  • Files are named as: User.csv.
  • Format:
  • nodeId,name
  • 1,jack
    public int run(String args) throws Exception {
    Configuration conf = this.getConf();conf.setBoolean("", false);
    conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
    conf.set("es.nodes", "");
    // conf.set("es.nodes","");
    conf.set("es.resource", "hadoop-person/hadoop-person");
    conf.set("es.output.json", "true");Job job = Job.getInstance(conf, "es-hadoop-example");
    job.setMapOutputValueClass(MapWritable.class);FileInputFormat.addInputPath(job, new Path("tinkerpop-modern-2.json"));
    FileOutputFormat.setOutputPath(job, new Path("output"));job.waitForCompletion(true);
    return 0;

private static class Hadoop2ElasticsearchMapper extends Mapper<Object, Text, NullWritable, MapWritable> {

public void map(Object key, Text value, Context context) throws IOException, InterruptedException
    JSONObject vertex = JSONObject.parseObject(value.toString());
    JSONObject properties = vertex.getJSONObject("properties");
    JSONArray nest = new JSONArray();
    for (String fieldName : properties.keySet()) {
        JSONArray jsonArray = properties.getJSONArray(fieldName);
        for (Object o : jsonArray) {
            JSONObject property = JSONObject.parseObject(o.toString());
            JSONObject jsonObject = new JSONObject();
            jsonObject.put("prop", fieldName);
            jsonObject.put("val", property.getString("value"));
    MapWritable doc = new MapWritable();
    doc.put(new Text("ontology"), new JsonWritable(nest));
    context.write(NullWritable.get(), doc);



Writable representing a JSON object.
public class JsonWritable implements Writable {
private static final Gson GSON = new Gson();
private JSONArray jsonArray;


Creates an empty {@code JsonWritable}.
public JsonWritable() {
jsonArray = new JSONArray();

public JsonWritable(JSONArray jsonArray) {
this.jsonArray = jsonArray;


Deserializes a {@code JsonWritable} object. *
@param in source for raw byte representation
public void readFields(DataInput in) throws IOException {
int cnt = in.readInt();
byte buf = new byte[cnt];
jsonArray = JSONArray.parseArray(new String(buf, "UTF-8"));


Serializes this object. *
@param out where to write the raw byte representation
public void write(DataOutput out) throws IOException {
byte buf = GSON.toJson(jsonArray).getBytes();


Returns the serialized representation of this object as a byte array. *
@return byte array representing the serialized representation of this object
@throws IOException
public byte serialize() throws IOException {
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
DataOutputStream dataOut = new DataOutputStream(bytesOut);

return bytesOut.toByteArray();


public JSONArray getJsonArray() {
return jsonArray;

public String toString() {
return getJsonArray().toString();

public static JsonWritable create(DataInput in) throws IOException {
JsonWritable json = new JsonWritable();

return json;


public static JsonWritable create(byte bytes) throws IOException {
return create(new DataInputStream(new ByteArrayInputStream(bytes)));

