Hadoop流程记录如何跨块边界分割?

Pra*_*ati 118 hadoop split mapreduce block hdfs

根据 Hadoop - The Definitive Guide

FileInputFormats定义的逻辑记录通常不适合HDFS块.例如,TextInputFormat的逻辑记录是行,它们将经常跨越HDFS边界.这与你的程序的功能没有关系 - 例如,线路不会丢失或损坏 - 但值得了解,因为它确实意味着数据本地地图(即,与他们在同一主机上运行的地图)输入数据)将执行一些远程读取.这导致的轻微开销通常不显着.

假设记录行分为两个块(b1和b2).处理第一个块(b1)的映射器将注意到最后一行没有EOL分隔符,并从下一个数据块中取出剩余的行(b2).

映射器如何处理第二个块(b2)如何确定第一个记录是不完整的并且应该从块(b2)中的第二个记录开始处理?

Cha*_*guy 158

有趣的问题,我花了一些时间查看代码的详细信息,这是我的想法.分割由客户端处理InputFormat.getSplits,因此查看FileInputFormat会提供以下信息:

  • 对于每个输入文件,获取文件长度,块的大小,并计算分割尺寸max(minSize, min(maxSize, blockSize)),其中maxSize对应于mapred.max.split.sizeminSizemapred.min.split.size.
  • FileSplit根据上面计算的分割大小将文件分成不同的s.这里重要的是每个FileSplit都使用与start输入文件中的偏移量相对应的参数进行初始化.此时仍然没有处理线路.代码的相关部分如下所示:

    while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
      int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
      splits.add(new FileSplit(path, length-bytesRemaining, splitSize, 
                               blkLocations[blkIndex].getHosts()));
      bytesRemaining -= splitSize;
    }
    
    Run Code Online (Sandbox Code Playgroud)

之后,如果你看一下由LineRecordReader哪个定义TextInputFormat,那就是处理这些行的地方:

  • 初始化时,LineRecordReader它会尝试实例化一个LineReader抽象,以便能够读取行FSDataInputStream.有2种情况:
  • 如果有CompressionCodec定义的,则此编解码器负责处理边界.可能与您的问题无关.
  • 如果没有编解码器,那就是感兴趣的地方:如果startInputSplit的不同于0,那么你回溯1个字符,然后跳过你遇到的第一行\n或\ r \n(Windows)!原路返回是重要的,因为如果你的行边界是相同的分割界限,这样可以确保你不要跳过的有效行.这是相关代码:

    if (codec != null) {
       in = new LineReader(codec.createInputStream(fileIn), job);
       end = Long.MAX_VALUE;
    } else {
       if (start != 0) {
         skipFirstLine = true;
         --start;
         fileIn.seek(start);
       }
       in = new LineReader(fileIn, job);
    }
    if (skipFirstLine) {  // skip first line and re-establish "start".
      start += in.readLine(new Text(), 0,
                        (int)Math.min((long)Integer.MAX_VALUE, end - start));
    }
    this.pos = start;
    
    Run Code Online (Sandbox Code Playgroud)

因此,由于分割是在客户端中计算的,因此映射器不需要按顺序运行,每个映射器都知道它是否需要丢弃第一行.

所以基本上如果你在同一个文件中每个100Mb有2行,并且为了简化,我们说分割大小是64Mb.然后,当计算输入拆分时,我们将得到以下场景:

  • 拆分1包含该块的路径和主机.初始化为200-200 = 0Mb,长度为64Mb.
  • 拆分2初始化为200-200 + 64 = 64Mb,长度为64Mb.
  • 拆分3在开始时初始化200-200 + 128 = 128Mb,长度64Mb.
  • 拆分4在开始时初始化200-200 + 192 = 192Mb,长度8Mb.
  • 映射器A将处理拆分1,启动为0,因此不要跳过第一行,并读取超出64Mb限制的完整行,因此需要远程读取.
  • 映射器B将处理分裂2,开始是!= 0因此跳过64Mb-1byte之后的第一行,这对应于100Mb处的行1的末尾仍处于分割2中,我们在分割2中有28Mb的行,所以远程读取剩余的72Mb.
  • 映射器C将处理分裂3,start是!= 0因此跳过128Mb-1byte之后的第一行,这对应于200Mb处的第2行的结尾,这是文件的结尾所以不做任何事情.
  • 映射器D与映射器C相同,只是它在192Mb-1byte之后查找换行符.

  • @PraveenSripati在这种情况下,第二个映射器将看到start!= 0,因此回溯1个字符,它将您带回到第一行的\n之前,然后跳到下面的\n.因此它将跳过第一行但按预期处理第二行. (6认同)

Rav*_*abu 17

Map Reduce算法不适用于文件的物理块.它适用于逻辑输入拆分.输入拆分取决于记录的写入位置.记录可能跨越两个Mappers.

该方法HDFS已经成立,它打破了非常大的文件到大块(例如,测量128MB),存储集群中的不同节点上的这些块的三个副本.

HDFS无法识别这些文件的内容.可能已在Block-a中启动记录,但该记录的结尾可能出现在Block-b中.

为了解决这个问题,Hadoop使用存储在文件块中的数据的逻辑表示,称为输入拆分.当MapReduce作业客户端计算输入拆分时,它会计算块中第一个完整记录的开始位置以及块中最后一个记录的结束位置.

关键点:

在块中的最后一个记录不完整的情况下,输入分割包括下一个块的位置信息和完成记录所需的数据的字节偏移.

看看下面的图表.

在此输入图像描述

看看这篇文章和相关的SE问题:关于Hadoop/HDFS文件分割

可以从文档中读取更多详细信息

Map-Reduce框架依赖于作业的InputFormat:

  1. 验证作业的输入规范.
  2. 将输入文件拆分为逻辑InputSplits,然后将每个输入文件分配给单个Mapper.
  3. 然后将每个InputSplit分配给单个Mapper进行处理.斯普利特可能是元组. InputSplit[] getSplits(JobConf job,int numSplits)是处理这些事情的API.

FileInputFormat,扩展InputFormatimplements getSplits()方法.在grepcode上看一下这个方法的内部结构


Dav*_*man 7

我认为如下:InputFormat负责将数据拆分为逻辑拆分,同时考虑到数据的性质.
没有什么可以阻止它这样做,虽然它可以为作业增加显着的延迟 - 围绕所需的分割大小边界的所有逻辑和读取都将在jobtracker中发生.
最简单的记录感知输入格式是TextInputFormat.它的工作原理如下(据我从代码中理解) - 输入格式按大小创建拆分,无论行如何,但LineRecordReader始终:
a)跳过拆分中的第一行(或部分),如果不是第一次分割
b)在分割的边界之后读取一行(如果数据可用,那么它不是最后一次分割).