MapReduce:如何让mapper处理多行?

csi*_*siu 2 java hadoop split mapreduce input

目标:

  • 我希望能够指定输入文件上使用的映射器数量
  • 同样,我想指定每个映射器将采用的文件行数

简单的例子:

对于10行(长度不等;下面的示例)的输入文件,我希望有2个映射器 - 每个映射器将处理5行.

This is
an arbitrary example file
of 10 lines.
Each line does
not have to be
of
the same
length or contain
the same
number of words
Run Code Online (Sandbox Code Playgroud)

这就是我所拥有的:

(我有它,以便每个映射器生成一个"<map,1>"键值对...以便它将在reducer中求和)

package org.myorg;
import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.InputFormat;


public class Test {

  // prduce one "<map,1>" pair per mapper
  public static class Map extends Mapper<Object, Text, Text, IntWritable>{
    private final static IntWritable one = new IntWritable(1);
    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
      context.write(new Text("map"), one);
    }
  }

  // reduce by taking a sum
  public static class Red extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {      
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }


  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job1 = Job.getInstance(conf, "pass01");

    job1.setJarByClass(Test.class);
    job1.setMapperClass(Map.class);
    job1.setCombinerClass(Red.class);
    job1.setReducerClass(Red.class);

    job1.setOutputKeyClass(Text.class);
    job1.setOutputValueClass(IntWritable.class);

    FileInputFormat.addInputPath(job1, new Path(args[0]));
    FileOutputFormat.setOutputPath(job1, new Path(args[1]));

    // // Attempt#1
    // conf.setInt("mapreduce.input.lineinputformat.linespermap", 5);
    // job1.setInputFormatClass(NLineInputFormat.class);

    // // Attempt#2
    // NLineInputFormat.setNumLinesPerSplit(job1, 5);
    // job1.setInputFormatClass(NLineInputFormat.class);

    // // Attempt#3
    // conf.setInt(NLineInputFormat.LINES_PER_MAP, 5);
    // job1.setInputFormatClass(NLineInputFormat.class);

    // // Attempt#4
    // conf.setInt("mapreduce.input.fileinputformat.split.minsize", 234);
    // conf.setInt("mapreduce.input.fileinputformat.split.maxsize", 234);


    System.exit(job1.waitForCompletion(true) ? 0 : 1);
  }
}
Run Code Online (Sandbox Code Playgroud)

上面的代码将使用上面的示例数据生成

map 10
Run Code Online (Sandbox Code Playgroud)

我想要输出

map 2
Run Code Online (Sandbox Code Playgroud)

第一个映射器将执行某些操作,前5行,第二个映射器将使用后5行执行某些操作.

Ash*_*ith 7

您可以使用NLineInputFormat.

通过NLineInputFormat功能,您可以准确指定映射器应该有多少行.例如,如果您的文件有500行,并且您将每个映射器的行数设置为10,则您有50个映射器(而不是一个 - 假设该文件小于HDFS块大小).

编辑:

以下是使用NLineInputFormat的示例:

映射器类:

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MapperNLine extends Mapper<LongWritable, Text, LongWritable, Text> {

    @Override
    public void map(LongWritable key, Text value, Context context)
          throws IOException, InterruptedException {

        context.write(key, value);
    }

}
Run Code Online (Sandbox Code Playgroud)

司机班:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Driver extends Configured implements Tool {

    @Override
    public int run(String[] args) throws Exception {

        if (args.length != 2) {
            System.out
                  .printf("Two parameters are required for DriverNLineInputFormat- <input dir> <output dir>\n");
            return -1;
        }

        Job job = new Job(getConf());
        job.setJobName("NLineInputFormat example");
        job.setJarByClass(Driver.class);

        job.setInputFormatClass(NLineInputFormat.class);
        NLineInputFormat.addInputPath(job, new Path(args[0]));
        job.getConfiguration().setInt("mapreduce.input.lineinputformat.linespermap", 5);

        LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.setMapperClass(MapperNLine.class);
        job.setNumReduceTasks(0);

        boolean success = job.waitForCompletion(true);
        return success ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new Configuration(), new Driver(), args);
        System.exit(exitCode);
    }
}
Run Code Online (Sandbox Code Playgroud)

使用您提供的输入,上面的示例Mapper的输出将被写入两个文件,因为2个Mapper初始化:

部分-M-00001

0   This is
8   an arbitrary example file
34  of 10 lines.
47  Each line does
62  not have to be
Run Code Online (Sandbox Code Playgroud)

部分-M-00002

77  of
80  the same
89  length or contain
107 the same
116 number of words
Run Code Online (Sandbox Code Playgroud)