当前位置: 首页 > 知识库问答 >
问题:

Hadoop处理记录如何跨块边界拆分?

米俊喆
2023-03-14

根据Hadoop--权威指南

FileInputFormats定义的逻辑记录通常不适合HDFS块。例如,TextInputFormat的逻辑记录是行,它们通常会跨越HDFS边界。这与程序的功能无关--例如,行不会丢失或中断--但值得了解,因为这确实意味着数据本地映射(即与输入数据运行在同一主机上的映射)将执行一些远程读取。这导致的轻微开销通常并不显著。

假设记录行被分成两个块(b1和b2)。处理第一个块(b1)的映射器将注意到最后一行没有EOL分隔符,并从下一个数据块(b2)中获取该行的剩余部分。

处理第二块(b2)的映射器如何确定第一记录是不完整的,并且应该从块(b2)中的第二记录开始处理?

共有1个答案

拓拔麒
2023-03-14

有趣的问题,我花了一些时间查看代码的细节,下面是我的想法。客户机通过inputformat.getsplits处理拆分,因此查看FileInputFormat可以得到以下信息:

>

  • 对于每个输入文件,获取文件长度和块大小,并将拆分大小计算为max(minSize,min(maxSize,blockSize)),其中maxSize对应于mapred.max.split.sizeminSizemapred.min.split.size.
  • 根据上面计算的拆分大小将文件划分为不同的filesplit。这里重要的是,每个filesplit都用一个start参数初始化,该参数对应于输入文件中的偏移量。在那一点上仍然没有对线路的处理。代码的相关部分如下所示:

    while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
      int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
      splits.add(new FileSplit(path, length-bytesRemaining, splitSize, 
                               blkLocations[blkIndex].getHosts()));
      bytesRemaining -= splitSize;
    }
    

    之后,如果查看由TextInputFormat定义的LineRecordReader,那么行就是在这里处理的:

    >

  • 初始化LineRecordReader时,它试图实例化LineReader,这是一种抽象,可以读取FSDataInputStream上的行。有2种情况:
  • 如果定义了compressioncodec,那么这个编解码器负责处理边界。可能与您的问题无关。
  • 但是,如果没有编解码器,这就是有趣的地方:如果inputsplitstart不同于0,那么您将回溯1个字符,然后跳过您遇到的由\n或\r\n(Windows)标识的第一行!回溯很重要,因为如果您的行边界与拆分边界相同,这将确保您不会跳过有效的行。以下是相关代码:

    if (codec != null) {
       in = new LineReader(codec.createInputStream(fileIn), job);
       end = Long.MAX_VALUE;
    } else {
       if (start != 0) {
         skipFirstLine = true;
         --start;
         fileIn.seek(start);
       }
       in = new LineReader(fileIn, job);
    }
    if (skipFirstLine) {  // skip first line and re-establish "start".
      start += in.readLine(new Text(), 0,
                        (int)Math.min((long)Integer.MAX_VALUE, end - start));
    }
    this.pos = start;
    

    因此,由于拆分是在客户机中计算的,映射器不需要按顺序运行,每个映射器都已经知道是否需要丢弃第一行。

    所以基本上,如果在同一个文件中有两行,每行100MB,为了简化,假设拆分大小为64MB。然后当计算输入分裂时,我们将有以下场景:

    • 包含此块的路径和主机的拆分1。开始时初始化200-200=0MB,长度64MB.
    • 拆分2在开始时初始化200-200+64=64MB,长度64MB.
    • 拆分3在开始时初始化200-200+128=128MB,长度64MB.
    • 拆分4在开始时初始化200-200+192=192MB,长度8MB.
    • 映射器A将处理拆分1,开始是0,所以不要跳过第一行,而是读取超过64MB限制的整行,所以需要远程读取。
    • 映射器B将处理split 2,start is!=0所以跳过64MB-1Byte之后的第一行,它对应于第1行的100MB结尾,仍然在split 2中,我们在split 2中有28MB的行,所以远程读取剩余的72MB。
    • 映射器C将处理split 3,start is!=0所以跳过128MB-1byte之后的第一行,它对应于200MB的第2行的结尾,这是文件的结尾,所以不要做任何事情。
    • 映射器D与映射器C相同,只是它在192MB-1字节之后查找换行符。

  •  类似资料:
    • 假设我有一个输入文件,在HDFS中为这个文件创建了三个块。假设我有三个数据节点,每个数据节点存储一个块。如果我有3个输入拆分,则3个映射器将并行运行,以处理各自数据节点的本地数据。每个映射器使用输入格式和记录读取器以键值对的形式获取输入。此场景使用TextInputFormat,其中记录是来自文件的完整文本行。 这里的问题是,如果在第一个块的末尾有记录中断,会发生什么。 1)Hadoop如何读取此

    • 在apache文档中阅读以下内容: InputSplit表示单个映射器要处理的数据。 通常,它在输入上显示一个面向字节的视图,作业的RecordReader负责处理该输入并显示一个面向记录的视图。 链接-https://hadoop.apache.org/docs/r2.6.1/api/org/apache/hadoop/mapred/inputsplit.html 有人能解释一下面向字节的视图和

    • 我目前正在测试将现有应用程序迁移到拼图模块。我的一个模块使用ElasticSearch及其Groovy插件。 org.elasticsearch:elasticsearch org.elasticsearch.module: lang-groovy 不幸的是,他们共享一个拆分包,所以给了我: X从lang.groovy和elasticsearch读取包org.elasticsearch.scrip

    • 本文向大家介绍React v15中怎么处理错误边界?相关面试题,主要包含被问及React v15中怎么处理错误边界?时的应答技巧和注意事项,需要的朋友参考一下 React 15 中有一个支持有限的错误边界方法 unstable_handleError。此方法不再起作用,同时自 React 16 beta 发布起你需要在代码中将其修改为 componentDidCatch。

    • 我们有一个spring批处理程序,它可以访问读卡器中的一组数据,对其进行处理和写入。这一切都是成批发生的。 我注意到,处理器和写入程序对同一数据进行了两次检查,一次作为批处理,一次作为单个记录。 例如:写入器读取1000条记录,向处理器发送1000条记录,向写入器发送1000条记录。 在此之后,记录再次被单独处理,但只调用处理器和写入器。 我们在所有读卡器、处理器和写入程序中都有日志语句,我可以看

    • 我正在使用spring批处理使用RepositoryItemReader从postgresql DB读取记录,然后将其写入主题。我看到大约有100万条记录需要处理,但它并没有处理所有的记录。我已经将reader的pageSize设置为10,000并且与提交间隔(块大小)相同