使用hadoop计算集合交集并设置两个文件的记录差异

rav*_*don 5 java hadoop set-intersection

很抱歉在hadoop用户邮件列表和此处交叉发布此内容,但这对我来说非常紧迫.

我的问题如下:我有两个输入文件,我想确定

  • a)仅出现在文件1中的行数
  • b)仅出现在文件2中的行数
  • c)两者共有的行数(例如关于字符串相等)

例:

File 1:
a
b
c

File 2:
a
d
Run Code Online (Sandbox Code Playgroud)

每种情况的期望输出:

lines_only_in_1: 2         (b, c)
lines_only_in_2: 1         (d)
lines_in_both:   1         (a)
Run Code Online (Sandbox Code Playgroud)

基本上我的方法如下:我编写了自己的LineRecordReader,以便映射器接收一对由行(文本)和一个指示源文件(0或1)的字节组成的对.映射器只返回该对,所以它实际上什么都不做.然而,副作用是,组合器接收到

Map<Line, Iterable<SourceId>>
Run Code Online (Sandbox Code Playgroud)

(其中SourceId为0或1).

现在,对于每一行,我可以得到它出现的源集.因此,我可以编写一个组合器,计算每种情况(a,b,c)的行数(清单1)

然后组合器仅在清理时输出"摘要"(这是安全吗?).所以这个总结如下:

lines_only_in_1   2531
lines_only_in_2   3190
lines_in_both      901
Run Code Online (Sandbox Code Playgroud)

在减速器中,我只总结了这些摘要的值.(因此减速器的输出看起来与组合器的输出一样).

但是,主要问题是,我需要将两个源文件视为单个虚拟文件,该文件生成表单(line,sourceId)// sourceId 0或1的记录

而且我不确定如何实现这一目标.所以问题是我是否可以事先避免预处理和合并文件,并使用像虚拟合并文件阅读器和自定义记录阅读器这样的东西即时执行.任何代码示例都非常感谢.

最好的问候,克劳斯

清单1:

public static class SourceCombiner
    extends Reducer<Text, ByteWritable, Text, LongWritable> {

    private long countA = 0;
    private long countB = 0;
    private long countC = 0; // C = lines (c)ommon to both sources

    @Override
    public void reduce(Text key, Iterable<ByteWritable> values, Context context) throws IOException, InterruptedException {
        Set<Byte> fileIds = new HashSet<Byte>();
        for (ByteWritable val : values) {
            byte fileId = val.get();

            fileIds.add(fileId);
        }

        if(fileIds.contains((byte)0)) { ++countA; }
        if(fileIds.contains((byte)1)) { ++countB; }
        if(fileIds.size() >= 2) { ++countC; }
    }

    protected void cleanup(Context context)
            throws java.io.IOException, java.lang.InterruptedException
    {
        context.write(new Text("in_a_distinct_count_total"), new LongWritable(countA));
        context.write(new Text("in_b_distinct_count_total"), new LongWritable(countB));
        context.write(new Text("out_common_distinct_count_total"), new LongWritable(countC));
    }
}
Run Code Online (Sandbox Code Playgroud)

Tho*_*lut 2

好吧,我必须承认,到目前为止,我并没有真正理解您所尝试的要点,但我有一个简单的方法来完成您可能需要的事情。

看一下文件映射器。这个将获取文件名并将其与输入的每一行一起提交。

    public class FileMapper extends Mapper<LongWritable, Text, Text, Text> {

        static Text fileName;

        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            context.write(value, fileName);
        }

        @Override
        protected void setup(Context context) throws IOException,
                InterruptedException {

            String name = ((FileSplit) context.getInputSplit()).getPath().getName();
            fileName = new Text(name);
        }
    }
Run Code Online (Sandbox Code Playgroud)

现在我们有一堆像这样的键/值(关于您的示例)

    a File 1
    b File 1
    c File 1

    a File 2
    d File 2
Run Code Online (Sandbox Code Playgroud)

显然减少它们会给你这样的输入:

    a File 1,File 2
    b File 1
    c File 1
    d File 2
Run Code Online (Sandbox Code Playgroud)

您需要在减速器中执行的操作可能如下所示:

public class FileReducer extends Reducer<Text, Text, Text, Text> {

    enum Counter {
        LINES_IN_COMMON, LINES_IN_FIRST, LINES_IN_SECOND
    }

    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        HashSet<String> set = new HashSet<String>();
        for (Text t : values) {
            set.add(t.toString());
        }

        // if we have only two files and we have just two records in our hashset
        // the line is contained in both files
        if (set.size() == 2) {
            context.getCounter(Counter.LINES_IN_COMMON).increment(1);
        } else {
            // sorry this is a bit dirty...
            String t = set.iterator().next();
            // determine which file it was by checking for the name:
            if(t.toString().equals("YOUR_FIRST_FILE_NAME")){
                context.getCounter(Counter.LINES_IN_FIRST).increment(1);
            } else {
                context.getCounter(Counter.LINES_IN_SECOND).increment(1);
            }
        }
    }

}
Run Code Online (Sandbox Code Playgroud)

您必须将 if 语句中的字符串替换为您的文件名。

我认为使用作业计数器比使用自己的原语并将它们写入清理中的上下文更清晰一些。您可以在完成后调用以下内容来检索作业的计数器:

Job job = new Job(new Configuration());
//setup stuff etc omitted..
job.waitForCompletion(true);
// do the same line with the other enums
long linesInCommon = job.getCounters().findCounter(Counter.LINES_IN_COMMON).getValue();
Run Code Online (Sandbox Code Playgroud)

尽管如此,如果您需要 HDFS 中的公共行数等,那么请寻找您的解决方案。

希望对您有帮助。