Aru*_*A K 21 java hadoop mapreduce
我已尝试MultipleOutputs按照http://hadoop.apache.org/docs/mapreduce/r0.21.0/api/index.html?org/apache/hadoop/mapreduce/lib/output/MultipleOutputs中的示例使用该类html的
驱动程序代码
Configuration conf = new Configuration();
Job job = new Job(conf, "Wordcount");
job.setJarByClass(WordCount.class);
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
MultipleOutputs.addNamedOutput(job, "text", TextOutputFormat.class,
Text.class, IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
Run Code Online (Sandbox Code Playgroud)
减速机代码
public class WordCountReducer extends
Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
private MultipleOutputs<Text, IntWritable> mos;
public void setup(Context context){
mos = new MultipleOutputs<Text, IntWritable>(context);
}
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
//context.write(key, result);
mos.write("text", key,result);
}
public void cleanup(Context context) {
try {
mos.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
Run Code Online (Sandbox Code Playgroud)
发现reducer的输出重命名为text-r-00000
但这里的问题是我也得到一个空的part-r-00000文件.这是预期MultipleOutputs的行为,还是我的代码有问题?请指教.
我尝试过的另一个替代方法是使用FileSystem类迭代我的输出文件夹,并手动重命名以part开头的所有文件.
什么是最好的方法?
FileSystem hdfs = FileSystem.get(configuration);
FileStatus fs[] = hdfs.listStatus(new Path(outputPath));
for (FileStatus aFile : fs) {
if (aFile.isDir()) {
hdfs.delete(aFile.getPath(), true);
// delete all directories and sub-directories (if any) in the output directory
}
else {
if (aFile.getPath().getName().contains("_"))
hdfs.delete(aFile.getPath(), true);
// delete all log files and the _SUCCESS file in the output directory
else {
hdfs.rename(aFile.getPath(), new Path(myCustomName));
}
}
Run Code Online (Sandbox Code Playgroud)
Cha*_*guy 21
即使您正在使用MultipleOutputs,默认OutputFormat(我相信它TextOutputFormat)仍在使用,因此它将初始化并创建part-r-xxxxx您正在看到的这些文件.
它们是空的这一事实是因为你没有做context.write因为你正在使用它MultipleOutputs.但这并不妨碍在初始化期间创建它们.
要摆脱它们,你需要定义你OutputFormat说你不期望任何输出.你可以这样做:
job.setOutputFormat(NullOutputFormat.class);
Run Code Online (Sandbox Code Playgroud)
使用该属性集,这应该确保您的零件文件根本不会被初始化,但您仍然可以获得输出MultipleOutputs.
您也可以使用LazyOutputFormat哪个将确保仅在/如果有某些数据时创建输出文件,而不是初始化空文件.你可以这样做:
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
Run Code Online (Sandbox Code Playgroud)
请注意,您正在使用Reducer原型MultipleOutputs.write(String namedOutput, K key, V value),它只使用默认输出路径,该路径将基于您生成以下内容namedOutput:{namedOutput}-(m|r)-{part-number}.如果您想要更多地控制输出文件名,您应该使用原型MultipleOutputs.write(String namedOutput, K key, V value, String baseOutputPath),它允许您根据键/值获取在运行时生成的文件名.
RHo*_*and 11
这是您在Driver类中需要做的所有操作,以更改输出文件的基本名称:
job.getConfiguration().set("mapreduce.output.basename", "text");
这将导致您的文件被称为"text-r-00000".
| 归档时间: |
|
| 查看次数: |
15939 次 |
| 最近记录: |