mapreduce工作的链接

bas*_*ath 3 hadoop mapreduce

我遇到了"mapreduce工作的链接".作为mapreduce的新手,在什么情况下我们必须链接(我假设链接意味着一个接一个地依次运行mapreduce作业)工作?

有没有可以提供帮助的例子?

yur*_*gis 12

必须链接的作业的经典示例是输出按其频率排序的单词的单词计数.

你会需要:

工作1:

  • 输入源映射器(将单词作为键发出,将一个作为值发送)
  • 聚合reducer(聚合字数)

工作2:

  • 键/值交换映射器(将频率作为键,将字作为值)
  • 隐式身份缩减器(获取按频率排序的单词,不必实现)

以下是上面的映射器/缩减器的示例:

public class HadoopWordCount {


  public static class TokenizerMapper extends Mapper<Object, Text, Text, LongWritable> {

    private final static Text word = new Text();
    private final static LongWritable one = new LongWritable(1);

    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }

  public static class KeyValueSwappingMapper extends Mapper<Text, LongWritable, LongWritable, Text> {

    public void map(Text key, LongWritable value, Context context) throws IOException, InterruptedException {
      context.write(value, key);
    }
  }

  public static class SumReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
    private LongWritable result = new LongWritable();

    public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException,
        InterruptedException {
      long sum = 0;
      for (LongWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
}
Run Code Online (Sandbox Code Playgroud)

以下是驱动程序的示例.

它期望两个论点:

  1. 用于计算单词的输入文本文件.
  2. 输出目录(不应该预先存在) - 在{this dir}/out2/part-r-0000文件中查找输出

    public static void main(String[] args) throws Exception {
    
        Configuration conf = new Configuration();
        Path out = new Path(args[1]);
    
        Job job1 = Job.getInstance(conf, "word count");
        job1.setJarByClass(HadoopWordCount.class);
        job1.setMapperClass(TokenizerMapper.class);
        job1.setCombinerClass(SumReducer.class);
        job1.setReducerClass(SumReducer.class);
        job1.setOutputKeyClass(Text.class);
        job1.setOutputValueClass(LongWritable.class);
        job1.setOutputFormatClass(SequenceFileOutputFormat.class);
        FileInputFormat.addInputPath(job1, new Path(args[0]));
        FileOutputFormat.setOutputPath(job1, new Path(out, "out1"));
        if (!job1.waitForCompletion(true)) {
          System.exit(1);
        }
        Job job2 = Job.getInstance(conf, "sort by frequency");
        job2.setJarByClass(HadoopWordCount.class);
        job2.setMapperClass(KeyValueSwappingMapper.class);
        job2.setNumReduceTasks(1);
        job2.setSortComparatorClass(LongWritable.DecreasingComparator.class);
        job2.setOutputKeyClass(LongWritable.class);
        job2.setOutputValueClass(Text.class);
        job2.setInputFormatClass(SequenceFileInputFormat.class);
        FileInputFormat.addInputPath(job2, new Path(out, "out1"));
        FileOutputFormat.setOutputPath(job2, new Path(out, "out2"));
        if (!job2.waitForCompletion(true)) {
          System.exit(1);
        }
    
    }
    
    Run Code Online (Sandbox Code Playgroud)