Hadoop流程记录如何跨块边界分割?

hadoop split mapreduce block hdfs

30347 观看

6回复

19283 作者的声誉

根据 Hadoop - The Definitive Guide

FileInputFormats定义的逻辑记录通常不适合HDFS块。例如,TextInputFormat的逻辑记录是行,它们将经常跨越HDFS边界。这与你的程序的功能没有关系 - 例如,线路不会丢失或损坏 - 但值得了解,因为它确实意味着数据本地地图(即,与他们在同一主机上运行的地图)输入数据)将执行一些远程读取。这导致的轻微开销通常不显着。

假设记录行分为两个块(b1和b2)。处理第一个块(b1)的映射器将注意到最后一行没有EOL分隔符,并从下一个数据块中取出剩余的行(b2)。

映射器如何处理第二个块(b2)如何确定第一个记录是不完整的并且应该从块(b2)中的第二个记录开始处理?

作者: Praveen Sripati 的来源 发布者: 2013 年 1 月 12 日

回应 6


7

7390 作者的声誉

我认为如下:InputFormat负责将数据拆分为逻辑拆分,同时考虑到数据的性质。
没有什么可以阻止它这样做,虽然它可以为作业增加显着的延迟 - 围绕所需的分割大小边界的所有逻辑和读取都将在jobtracker中发生。
最简单的记录感知输入格式是TextInputFormat。它的工作原理如下(据我从代码中理解) - 输入格式按大小创建拆分,无论行如何,但LineRecordReader始终:
a)跳过拆分中的第一行(或部分),如果不是第一次分割
b)在分割的边界之后读取一行(如果数据可用,那么它不是最后一次分割)。

作者: David Gruzman 发布者: 2013 年 1 月 12 日

3

2444 作者的声誉

根据我的理解,当FileSplit为第一个块初始化时,将调用默认构造函数。因此,start和length的值最初为零。在第一个块的处理结束时,如果最后一行不完整,那么长度的值将大于分割的长度,并且它也将读取下一个块的第一行。由于这个原因,第一个块的start值将大于零,在这种情况下,LineRecordReader将跳过第二个块的第一行。(见来源

如果第一个块的最后一行完成,则length的值将等于第一个块的长度,第二个块的start的值将为零。在这种情况下,LineRecordReader不会跳过第一行并从头开始读取第二个块。

说得通?

作者: aa8y 发布者: 2013 年 1 月 23 日

147

26046 作者的声誉

决定

有趣的问题,我花了一些时间查看代码的详细信息,这是我的想法。分割由客户端处理InputFormat.getSplits,因此查看FileInputFormat会提供以下信息:

  • 对于每个输入文件,获取文件长度,块的大小,并计算分割尺寸max(minSize, min(maxSize, blockSize)),其中maxSize对应于mapred.max.split.sizeminSizemapred.min.split.size
  • FileSplit根据上面计算的分割大小将文件分成不同的s。这里重要的是每个FileSplit都使用与start输入文件中的偏移量相对应的参数进行初始化。此时仍然没有处理线路。代码的相关部分如下所示:

    while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
      int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
      splits.add(new FileSplit(path, length-bytesRemaining, splitSize, 
                               blkLocations[blkIndex].getHosts()));
      bytesRemaining -= splitSize;
    }
    

之后,如果你看一下由LineRecordReader哪个定义TextInputFormat,那就是处理这些行的地方:

  • 初始化时,LineRecordReader它会尝试实例化一个LineReader抽象,以便能够读取行FSDataInputStream。有2种情况:
  • 如果有CompressionCodec定义的,则此编解码器负责处理边界。可能与您的问题无关。
  • 如果没有编解码器,那就是感兴趣的地方:如果startInputSplit的不同于0,那么你回溯1个字符,然后跳过你遇到的第一行\ n或\ r \ n(Windows)!回溯非常重要,因为如果您的行边界与拆分边界相同,这可以确保您不会跳过有效行。这是相关代码:

    if (codec != null) {
       in = new LineReader(codec.createInputStream(fileIn), job);
       end = Long.MAX_VALUE;
    } else {
       if (start != 0) {
         skipFirstLine = true;
         --start;
         fileIn.seek(start);
       }
       in = new LineReader(fileIn, job);
    }
    if (skipFirstLine) {  // skip first line and re-establish "start".
      start += in.readLine(new Text(), 0,
                        (int)Math.min((long)Integer.MAX_VALUE, end - start));
    }
    this.pos = start;
    

因此,由于分割是在客户端中计算的,因此映射器不需要按顺序运行,每个映射器都知道它是否需要丢弃第一行。

所以基本上如果你在同一个文件中每个100Mb有2行,并且为了简化,我们说分割大小是64Mb。然后,当计算输入拆分时,我们将得到以下场景:

  • 拆分1包含该块的路径和主机。初始化为200-200 = 0Mb,长度为64Mb。
  • 拆分2初始化为200-200 + 64 = 64Mb,长度为64Mb。
  • 拆分3在开始时初始化200-200 + 128 = 128Mb,长度64Mb。
  • 拆分4在开始时初始化200-200 + 192 = 192Mb,长度8Mb。
  • 映射器A将处理拆分1,启动为0,因此不要跳过第一行,并读取超出64Mb限制的完整行,因此需要远程读取。
  • 映射器B将处理分裂2,开始是!= 0因此跳过64Mb-1byte之后的第一行,这对应于100Mb处的行1的末尾仍处于分割2中,我们在分割2中有28Mb的行,所以远程读取剩余的72Mb。
  • 映射器C将处理分裂3,start是!= 0因此跳过128Mb-1byte之后的第一行,这对应于200Mb处的第2行的结尾,这是文件的结尾所以不做任何事情。
  • 映射器D与映射器C相同,只是它在192Mb-1byte之后查找换行符。
作者: Charles Menguy 发布者: 2013 年 1 月 26 日

0

1 作者的声誉

映射器不必进行通信。文件块是HDFS,当前映射器(RecordReader)可以读取具有该行剩余部分的块。这发生在幕后。

作者: user3507308 发布者: 2014 年 4 月 6 日

1

51 作者的声誉

从LineRecordReader.java的hadoop源代码构造函数:我找到一些评论:

// If this is not the first split, we always throw away first record
// because we always (except the last split) read one extra line in
// next() method.
if (start != 0) {
  start += in.readLine(new Text(), 0, maxBytesToConsume(start));
}
this.pos = start;

从中我相信hadoop将为每个分割读取一个额外的行(在当前分割结束时,在下一个分割中读取下一行),如果不是先分割,则第一行将被丢弃。这样就不会丢失和不完整的行记录

作者: Shenghai.Geng 发布者: 2015 年 1 月 27 日

15

28234 作者的声誉

Map Reduce算法不适用于文件的物理块。它适用于逻辑输入拆分。输入拆分取决于记录的写入位置。记录可能跨越两个Mappers。

该方法HDFS已经成立,它打破了非常大的文件到大块(例如,测量128MB),存储集群中的不同节点上的这些块的三个副本。

HDFS无法识别这些文件的内容。可能已在Block-a中启动记录,但该记录的结尾可能出现在Block-b中

为了解决这个问题,Hadoop使用存储在文件块中的数据的逻辑表示,称为输入拆分。当MapReduce作业客户端计算输入拆分时它会计算块中第一个完整记录的开始位置以及块中最后一个记录的结束位置

关键点:

在块中的最后一个记录不完整的情况下,输入分割包括下一个块的位置信息和完成记录所需的数据的字节偏移。

看看下面的图表。

在此输入图像描述

看看这篇文章和相关的SE问题:关于Hadoop / HDFS文件拆分

可以从文档中读取更多详细信息

Map-Reduce框架依赖于作业的InputFormat:

  1. 验证作业的输入规范。
  2. 将输入文件拆分为逻辑InputSplits,然后将每个输入文件分配给单个Mapper。
  3. 然后将每个InputSplit分配给单个Mapper进行处理。斯普利特可能是元组InputSplit[] getSplits(JobConf job,int numSplits)是处理这些事情的API。

FileInputFormat,扩展InputFormatimplements getSplits()方法。在grepcode上看一下这个方法的内部结构

作者: Ravindra babu 发布者: 2016 年 1 月 12 日
32x32