并行化Hadoop中的Ruby reducer?

Era*_*mpf 4 ruby java hadoop mapreduce

Ruby中一个简单的wordcount reducer看起来像这样:

#!/usr/bin/env ruby
wordcount = Hash.new
STDIN.each_line do |line|
keyval = line.split("|")
wordcount[keyval[0]] = wordcount[keyval[0]].to_i+keyval[1].to_i
end

wordcount.each_pair do |word,count|
puts "#{word}|#{count}"
end
Run Code Online (Sandbox Code Playgroud)

它在STDIN中获得所有映射器的中间值.不是来自特定的密钥.所以实际上只有一个减速器(而不是每个单词或每组单词的减速器).

但是,在Java示例中,我看到这个接口获取了一个键和值列表inout.这意味着在缩减和缩减器可以并行运行之前,按键对中间映射值进行分组:

public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
            public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
              int sum = 0;
              while (values.hasNext()) {
                sum += values.next().get();
              }
              output.collect(key, new IntWritable(sum));
            }
          }
Run Code Online (Sandbox Code Playgroud)

这是Java唯一的功能吗?或者我可以使用Ruby使用Hadoop Streaming吗?

Kev*_*eil 5

无论您是否使用流式传输,减少器将始终并行运行(如果您没有看到此情况,请验证作业配置是否设置为允许多个reduce任务 - 请参阅群集或作业配置中的mapred.reduce.tasks ).不同之处在于,当您使用Java与流式传输时,框架会为您提供更好的解决方案.

对于Java,reduce任务获取特定键的所有值的迭代器.如果您在reduce任务中对地图输出求和,则可以轻松地遍历值.在流式传输中,您实际上只获得了一对键值对.您可以保证按键排序值,并且对于给定键的值不会在reduce任务中拆分,但您需要的任何状态跟踪都取决于您.例如,在Java中,您的地图输出在表单中以符号形式出现在reducer中

key1,{val1,val2,val3} key2,{val7,val8}

使用流式传输,您的输出看起来像

key1,val1 key1,val2 key1,val3 key2,val7 key2,val8

例如,要编写计算每个键值的总和的reducer,您需要一个变量来存储您看到的最后一个键以及一个用于存储总和的变量.每次读取新的键值对时,都会执行以下操作:

  1. 检查密钥是否与最后一个密钥不同.
  2. 如果是,输出您的密钥和当前总和,并将总和重置为零.
  3. 将当前值添加到总和中,并将最后一个键设置为当前键.

HTH.