Moh*_*hit 3 java json hadoop mapreduce reducers
我有以下Reducer类
public static class TokenCounterReducer extends Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
JSONObject jsn = new JSONObject();
for (Text value : values) {
String[] vals = value.toString().split("\t");
String[] targetNodes = vals[0].toString().split(",",-1);
jsn.put("source",vals[1] );
jsn.put("target",targetNodes);
}
// context.write(key, new Text(sum));
}
}
Run Code Online (Sandbox Code Playgroud)
通过示例(免责声明:新手在这里),我可以看到一般输出类型似乎是一个键/值存储.
但是,如果输出中没有任何键,该怎么办?或者如果我想要的话,如果我的输出是其他格式(在我的情况下为json)?
无论如何,从上面的代码:我想写json对象到HDFS?
这在Hadoop流媒体中非常简单..但我如何在Hadoop java中做到这一点?
您可以使用Hadoop的OutputFormat接口来创建自定义格式,这些格式将根据您的意愿写入数据.例如,如果您需要将数据写为JSON对象,那么您可以这样做:
public class JsonOutputFormat extends TextOutputFormat<Text, IntWritable> {
@Override
public RecordWriter<Text, IntWritable> getRecordWriter(
TaskAttemptContext context) throws IOException,
InterruptedException {
Configuration conf = context.getConfiguration();
Path path = getOutputPath(context);
FileSystem fs = path.getFileSystem(conf);
FSDataOutputStream out =
fs.create(new Path(path,context.getJobName()));
return new JsonRecordWriter(out);
}
private static class JsonRecordWriter extends
LineRecordWriter<Text,IntWritable>{
boolean firstRecord = true;
@Override
public synchronized void close(TaskAttemptContext context)
throws IOException {
out.writeChar('{');
super.close(null);
}
@Override
public synchronized void write(Text key, IntWritable value)
throws IOException {
if (!firstRecord){
out.writeChars(",\r\n");
firstRecord = false;
}
out.writeChars("\"" + key.toString() + "\":\""+
value.toString()+"\"");
}
public JsonRecordWriter(DataOutputStream out)
throws IOException{
super(out);
out.writeChar('}');
}
}
}
Run Code Online (Sandbox Code Playgroud)
如果你不想在输出中输入密钥,只需发出null,如:
context.write(NullWritable.get(), new IntWritable(sum));
Run Code Online (Sandbox Code Playgroud)
HTH
如果您只想将一个JSON对象列表写入HDFS而不关心键/值的概念,您可以NullWritable在Reducer输出值中使用a :
public static class TokenCounterReducer extends Reducer<Text, Text, Text, NullWritable> {
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
for (Text value : values) {
JSONObject jsn = new JSONObject();
....
context.write(new Text(jsn.toString()), null);
}
}
}
Run Code Online (Sandbox Code Playgroud)
请注意,您需要更改作业配置:
job.setOutputValueClass(NullWritable.class);
Run Code Online (Sandbox Code Playgroud)
通过将JSON对象写入HDFS,我知道您要存储我在上面描述的JSON的String表示.如果您想将JSON的二进制表示存储到HDFS中,则需要使用SequenceFile.显然你可以Writable为此编写自己的,但我觉得如果你想要一个简单的String表示,这样就更容易了.