zer*_*ool 2 java hadoop mapreduce
我试图通过每个单词来反转文件的内容。我的程序运行良好,但我得到的输出是这样的
1 dwp
2 seviG
3 eht
4 tnerruc
5 gnikdrow
6 yrotcerid
7 ridkm
8 desU
9 ot
10 etaerc
Run Code Online (Sandbox Code Playgroud)
我希望输出是这样的
dwp seviG eht tnerruc gnikdrow yrotcerid ridkm desU
ot etaerc
Run Code Online (Sandbox Code Playgroud)
我正在使用的代码
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class Reproduce {
public static int temp =0;
public static class ReproduceMap extends MapReduceBase implements Mapper<LongWritable, Text, IntWritable, Text>{
private Text word = new Text();
@Override
public void map(LongWritable arg0, Text value,
OutputCollector<IntWritable, Text> output, Reporter reporter)
throws IOException {
String line = value.toString().concat("\n");
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(new StringBuffer(tokenizer.nextToken()).reverse().toString());
temp++;
output.collect(new IntWritable(temp),word);
}
}
}
public static class ReproduceReduce extends MapReduceBase implements Reducer<IntWritable, Text, IntWritable, Text>{
@Override
public void reduce(IntWritable arg0, Iterator<Text> arg1,
OutputCollector<IntWritable, Text> arg2, Reporter arg3)
throws IOException {
String word = arg1.next().toString();
Text word1 = new Text();
word1.set(word);
arg2.collect(arg0, word1);
}
}
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(WordCount.class);
conf.setJobName("wordcount");
conf.setOutputKeyClass(IntWritable.class);
conf.setOutputValueClass(Text.class);
conf.setMapperClass(ReproduceMap.class);
conf.setReducerClass(ReproduceReduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
}
Run Code Online (Sandbox Code Playgroud)
我如何修改我的输出而不是编写另一个 Java 程序来做到这一点
提前致谢
下面是一个简单的代码演示自定义FileoutputFormat的使用
public class MyTextOutputFormat extends FileOutputFormat<Text, List<IntWritable>> {
@Override
public org.apache.hadoop.mapreduce.RecordWriter<Text, List<Intwritable>> getRecordWriter(TaskAttemptContext arg0) throws IOException, InterruptedException {
//get the current path
Path path = FileOutputFormat.getOutputPath(arg0);
//create the full path with the output directory plus our filename
Path fullPath = new Path(path, "result.txt");
//create the file in the file system
FileSystem fs = path.getFileSystem(arg0.getConfiguration());
FSDataOutputStream fileOut = fs.create(fullPath, arg0);
//create our record writer with the new file
return new MyCustomRecordWriter(fileOut);
}
}
public class MyCustomRecordWriter extends RecordWriter<Text, List<IntWritable>> {
private DataOutputStream out;
public MyCustomRecordWriter(DataOutputStream stream) {
out = stream;
try {
out.writeBytes("results:\r\n");
}
catch (Exception ex) {
}
}
@Override
public void close(TaskAttemptContext arg0) throws IOException, InterruptedException {
//close our file
out.close();
}
@Override
public void write(Text arg0, List arg1) throws IOException, InterruptedException {
//write out our key
out.writeBytes(arg0.toString() + ": ");
//loop through all values associated with our key and write them with commas between
for (int i=0; i<arg1.size(); i++) {
if (i>0)
out.writeBytes(",");
out.writeBytes(String.valueOf(arg1.get(i)));
}
out.writeBytes("\r\n");
}
}
Run Code Online (Sandbox Code Playgroud)
最后,我们需要在运行之前告诉我们的工作我们的输出格式和路径。
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(ArrayList.class);
job.setOutputFormatClass(MyTextOutputFormat.class);
FileOutputFormat.setOutputPath(job, new Path("/home/hadoop/out"));
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2940 次 |
| 最近记录: |