My Trying:
public class Hadoop2ElasticsearchWithJSON extends Configured implements Tool {
public static void main(String args) throws Exception {
int res = ToolRunner.run(new Configuration(), new Hadoop2ElasticsearchWithJSON(), args);
System.exit(res);
}
/**
- Map-reduce program for importing Tweet User exampler data.
- Files are named as: User.csv.
- Format:
- nodeId,name
- 1,jack
*/
public int run(String args) throws Exception {
Configuration conf = this.getConf();conf.setBoolean("mapred.map.tasks.speculative.execution", false);
conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
conf.set("es.nodes", "");
// conf.set("es.nodes","127.0.0.1:9200");
conf.set("es.resource", "hadoop-person/hadoop-person");
conf.set("es.output.json", "true");Job job = Job.getInstance(conf, "es-hadoop-example");
job.setJarByClass(Hadoop2ElasticsearchWithJSON.class);
job.setMapperClass(Hadoop2ElasticsearchMapper.class);
job.setOutputFormatClass(EsOutputFormat.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(MapWritable.class);FileInputFormat.addInputPath(job, new Path("tinkerpop-modern-2.json"));
FileOutputFormat.setOutputPath(job, new Path("output"));job.waitForCompletion(true);
return 0;
}
private static class Hadoop2ElasticsearchMapper extends Mapper<Object, Text, NullWritable, MapWritable> {
public void map(Object key, Text value, Context context) throws IOException, InterruptedException
{
JSONObject vertex = JSONObject.parseObject(value.toString());
JSONObject properties = vertex.getJSONObject("properties");
JSONArray nest = new JSONArray();
for (String fieldName : properties.keySet()) {
JSONArray jsonArray = properties.getJSONArray(fieldName);
for (Object o : jsonArray) {
JSONObject property = JSONObject.parseObject(o.toString());
JSONObject jsonObject = new JSONObject();
jsonObject.put("prop", fieldName);
jsonObject.put("val", property.getString("value"));
nest.add(jsonObject);
}
}
MapWritable doc = new MapWritable();
doc.put(new Text("ontology"), new JsonWritable(nest));
context.write(NullWritable.get(), doc);
}
}
}
JsonWritable:
/**
Writable representing a JSON object.
*/
public class JsonWritable implements Writable {
private static final Gson GSON = new Gson();
private JSONArray jsonArray;
/**
Creates an empty {@code JsonWritable}.
*/
public JsonWritable() {
jsonArray = new JSONArray();
}
public JsonWritable(JSONArray jsonArray) {
this.jsonArray = jsonArray;
}
/**
Deserializes a {@code JsonWritable} object. *
@param in source for raw byte representation
*/
@override
public void readFields(DataInput in) throws IOException {
int cnt = in.readInt();
byte buf = new byte[cnt];
in.readFully(buf);
jsonArray = JSONArray.parseArray(new String(buf, "UTF-8"));
}
/**
Serializes this object. *
@param out where to write the raw byte representation
*/
@override
public void write(DataOutput out) throws IOException {
byte buf = GSON.toJson(jsonArray).getBytes();
out.writeInt(buf.length);
out.write(buf);
}
/**
Returns the serialized representation of this object as a byte array. *
@return byte array representing the serialized representation of this object
@throws IOException
*/
public byte serialize() throws IOException {
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
DataOutputStream dataOut = new DataOutputStream(bytesOut);
write(dataOut);
return bytesOut.toByteArray();
}
public JSONArray getJsonArray() {
return jsonArray;
}
@override
public String toString() {
return getJsonArray().toString();
}
public static JsonWritable create(DataInput in) throws IOException {
JsonWritable json = new JsonWritable();
json.readFields(in);
return json;
}
public static JsonWritable create(byte bytes) throws IOException {
return create(new DataInputStream(new ByteArrayInputStream(bytes)));
}
}