How to use MR to import nested data into ES?

My Trying:
public class Hadoop2ElasticsearchWithJSON extends Configured implements Tool {

public static void main(String args) throws Exception {
int res = ToolRunner.run(new Configuration(), new Hadoop2ElasticsearchWithJSON(), args);
System.exit(res);
}

/**

  • Map-reduce program for importing Tweet User exampler data.
  • Files are named as: User.csv.
  • Format:
  • nodeId,name
  • 1,jack
    */
    public int run(String args) throws Exception {
    Configuration conf = this.getConf();conf.setBoolean("mapred.map.tasks.speculative.execution", false);
    conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
    conf.set("es.nodes", "");
    // conf.set("es.nodes","127.0.0.1:9200");
    conf.set("es.resource", "hadoop-person/hadoop-person");
    conf.set("es.output.json", "true");Job job = Job.getInstance(conf, "es-hadoop-example");
    job.setJarByClass(Hadoop2ElasticsearchWithJSON.class);
    job.setMapperClass(Hadoop2ElasticsearchMapper.class);
    job.setOutputFormatClass(EsOutputFormat.class);
    job.setMapOutputKeyClass(NullWritable.class);
    job.setMapOutputValueClass(MapWritable.class);FileInputFormat.addInputPath(job, new Path("tinkerpop-modern-2.json"));
    FileOutputFormat.setOutputPath(job, new Path("output"));job.waitForCompletion(true);
    return 0;
    }

private static class Hadoop2ElasticsearchMapper extends Mapper<Object, Text, NullWritable, MapWritable> {

public void map(Object key, Text value, Context context) throws IOException, InterruptedException
{
    JSONObject vertex = JSONObject.parseObject(value.toString());
    JSONObject properties = vertex.getJSONObject("properties");
    JSONArray nest = new JSONArray();
    for (String fieldName : properties.keySet()) {
        JSONArray jsonArray = properties.getJSONArray(fieldName);
        for (Object o : jsonArray) {
            JSONObject property = JSONObject.parseObject(o.toString());
            JSONObject jsonObject = new JSONObject();
            jsonObject.put("prop", fieldName);
            jsonObject.put("val", property.getString("value"));
            nest.add(jsonObject);
        }
    }
    MapWritable doc = new MapWritable();
    doc.put(new Text("ontology"), new JsonWritable(nest));
    context.write(NullWritable.get(), doc);
}

}
}

JsonWritable:
/**

Writable representing a JSON object.
*/
public class JsonWritable implements Writable {
private static final Gson GSON = new Gson();
private JSONArray jsonArray;

/**

Creates an empty {@code JsonWritable}.
*/
public JsonWritable() {
jsonArray = new JSONArray();
}

public JsonWritable(JSONArray jsonArray) {
this.jsonArray = jsonArray;
}

/**

Deserializes a {@code JsonWritable} object. *
@param in source for raw byte representation
*/
@override
public void readFields(DataInput in) throws IOException {
int cnt = in.readInt();
byte buf = new byte[cnt];
in.readFully(buf);
jsonArray = JSONArray.parseArray(new String(buf, "UTF-8"));
}

/**

Serializes this object. *
@param out where to write the raw byte representation
*/
@override
public void write(DataOutput out) throws IOException {
byte buf = GSON.toJson(jsonArray).getBytes();
out.writeInt(buf.length);
out.write(buf);
}

/**

Returns the serialized representation of this object as a byte array. *
@return byte array representing the serialized representation of this object
@throws IOException
*/
public byte serialize() throws IOException {
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
DataOutputStream dataOut = new DataOutputStream(bytesOut);
write(dataOut);

return bytesOut.toByteArray();

}

public JSONArray getJsonArray() {
return jsonArray;
}

@override
public String toString() {
return getJsonArray().toString();
}

public static JsonWritable create(DataInput in) throws IOException {
JsonWritable json = new JsonWritable();
json.readFields(in);

return json;

}

public static JsonWritable create(byte bytes) throws IOException {
return create(new DataInputStream(new ByteArrayInputStream(bytes)));
}
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.