当前位置: 首页 > 知识库问答 >
问题:

在hadoop中使用可变长度/非分隔二进制文件进行拆分

东方英豪
2023-03-14

我刚刚开始开发一个基于hadoop的开放式街道地图数据的ingester。有几种格式--但我的目标是基于protocolbuffer的格式(注意--它不是纯粹的pb)。

在我看来,与以自定义记录读取器/输入格式处理可变长度编码相反,将文件预拆分为序列文件将更有效,但希望进行健全性检查。

该格式在PBF格式描述中有更详细的描述,但基本上它是[BlobHeader,blob]块的集合。

有一个Blob头

message BlobHeader {
   required string type = 1;
   optional bytes indexdata = 2;
   required int32 datasize = 3;
 }
 message Blob {
   optional bytes raw = 1; // No compression
   optional int32 raw_size = 2; // Only set when compressed, to the uncompressed size
   optional bytes zlib_data = 3;
   // optional bytes lzma_data = 4; // PROPOSED.
   // optional bytes OBSOLETE_bzip2_data = 5; // Deprecated.
 }

有没有更好的方法来处理这个问题--或者如果没有,对这两个建议的想法?

共有1个答案

孔鸿远
2023-03-14

嗯,我使用getSplits方法解析二进制文件--因为我跳过了99%以上的html" target="_blank">数据,所以它非常快(对于planet-osm 22GB的世界文件来说大约20秒)。下面是getSplits方法,如果有人遇到困难的话。

@Override
public List<InputSplit> getSplits(JobContext context){
    List<InputSplit> splits = new ArrayList<InputSplit>();
    FileSystem fs = null;
    Path file = OSMPBFInputFormat.getInputPaths(context)[0]; 
    FSDataInputStream in = null;
    try {
        fs = FileSystem.get(context.getConfiguration());
        in = fs.open(file);
        long pos = 0;
        while (in.available() > 0){
            int len = in.readInt(); 
            byte[] blobHeader = new byte[len]; 
            in.read(blobHeader);
            BlobHeader h = BlobHeader.parseFrom(blobHeader);
            FileSplit split = new FileSplit(file, pos,len + h.getDatasize(), new String[] {});
            splits.add(split);
            pos += 4;
            pos += len;
            pos += h.getDatasize();
            in.skip(h.getDatasize());
        }
    } catch (IOException e) {
        sLogger.error(e.getLocalizedMessage());
    } finally {
        if (in != null) {try {in.close();}catch(Exception e){}};
        if (fs != null) {try {fs.close();}catch(Exception e){}};
    }
    return splits;
}

到目前为止工作得很好--尽管我还没有得到真实的输出结果。它比将pbf复制到hdfs更快,在单个映射器中转换为序列,然后摄入(复制时间占主导地位)。它还比将外部程序复制到hdfs中的序列文件,然后针对hdfs运行映射器(为后者编写脚本)快20%。所以这里没有抱怨。

请注意,这将为每个块生成一个映射器--对于planet world文件来说,这是23K映射器。我实际上是在每个拆分捆绑多个块--在将拆分添加到集合之前,只需循环x次。

对于BlobHeader,我只是从上面的OSM wiki链接编译了protobuf.proto文件。如果需要,还可以从OSM-binary类中提取预生成的代码--maven片段为:

<dependency>
    <groupId>org.openstreetmap.osmosis</groupId>
    <artifactId>osmosis-osm-binary</artifactId>
    <version>0.43-RELEASE</version>
</dependency>
 类似资料:
  • 根据我的理解,应该在换行符处精确拆分,但根据我在网站上看到的一些答案,我似乎错了。有人有更好的解释吗?哪个选择是正确的? 以下哪项最能描述的工作方式? > 输入文件拆分可以交叉换行。跨越文件拆分的行由包含折线结尾的拆分的读取。 输入文件正好在换行符处拆分,因此每个记录读取器将读取一系列完整的行。 输入文件拆分可能会交叉换行符。将忽略横过平铺拆分的线。 输入文件拆分可能会交叉换行符。跨越文件拆分的一

  • 问题内容: 我正在尝试将一个二进制文件(如视频/音频/图像)分成每个100kb的块,然后将这些块重新连接回原来的文件。我的代码似乎可以正常工作,从某种意义上说,它可以分割文件并合并块,我返回的文件大小与原始文件相同。但是,问题在于内容会被截断- 也就是说,如果它是视频文件,它将在2秒钟后停止,如果它是图像文件,则只有上部看起来正确。 这是我正在使用的代码(如果您愿意,我可以发布整个代码): 划分:

  • vendor 文件是什么? 任何一个 Composer 程序包想要传递给安装包的用户的命令行脚本都应该被列为 vendor 文件。 如果包包含了包用户不需要的其他脚本(如构建或编译脚本),则代码不应被列为 vendor 文件。 它是如何定义的? 它是通过将 bin 键添加到项目中来定义的 composer.json。它被指定为文件数组,因此可以为任何给定项目添加多个二进制文件。 { "bi

  • 我正在使用PyCharm和PySpark运行一个巨大的文本文件。 这就是我想做的: 但是,PyCharm抛给我这个错误: 我只是不明白为什么。toPandas()不工作。Spark版本为2.3。这个版本有什么我不知道的变化吗?我用spark 2.2在另一台机器上运行了这段代码,它运行得很好。 我甚至把出口线改成了这样 仍然得到相同的错误。我做错了什么?是否有其他方法可以在不影响性能的情况下将导出到

  • objdump工具用来显示二进制文件的信息,就是以一种可阅读的格式让你更多地了解二进制文件可能带有的附加信息。 14.1. 常用参数说明 -f 显示文件头信息 -D 反汇编所有section (-d反汇编特定section) -h 显示目标文件各个section的头部摘要信息 -x 显示所有可用的头信息,包括符号表、重定位入口。-x 等价于 -a -f -h -r -t 同时指定。 -i 显示对于

  • 嗯,我下载了winutils.exe,创建了“C:\winutils\bin”并复制了winutils。还创建了环境路径HADOOP_HOME。但我不明白为什么它不起作用。我的代码