当前位置: 首页 > 知识库问答 >
问题:

如何用apache spark处理数百万个较小的s3文件

南门焱
2023-03-14
sc.wholeTextFiles(s3aPath + "/*/*/*/*.txt").count();

注意:计数是对处理文件需要多长时间的更多调试。这项工作几乎花了一整天的时间,超过10个实例,但仍然失败,错误发布在列表的底部。然后我找到了这个链接,它基本上说这不是最佳的:https://forums.databricks.com/questions/480/how-do-i-ingest-a-large-number-of-files-from-s3-my.html

然后,我决定尝试另一个我目前找不到的解决方案,即加载所有路径,然后联合所有rdds

    ObjectListing objectListing = s3Client.listObjects(bucket);
    List<JavaPairRDD<String, String>> rdds = new ArrayList<>();
    List<JavaPairRDD<String, String>> tempMeta = new ArrayList<>();
    //initializes objectListing
    tempMeta.addAll(objectListing.getObjectSummaries().stream()
            .map(func)
            .filter(item -> item != null && item.getMediaType().equalsIgnoreCase("transcript"))
            .map(item -> SparkConfig.getSparkContext().wholeTextFiles("s3a://" + bucket + "/" + item.getFileName()))
            .collect(Collectors.toList()));

    while(objectListing.isTruncated()) {
        objectListing = s3Client.listNextBatchOfObjects(objectListing);
        tempMeta.addAll(objectListing.getObjectSummaries().stream()
                .map(func)
                .filter(item -> item != null && item.getMediaType().equalsIgnoreCase("transcript"))
                .map(item -> SparkConfig.getSparkContext().wholeTextFiles("s3a://" + bucket + "/" + item.getFileName()))
                .collect(Collectors.toList()));
        if (tempMeta.size() > 5000) {
            rdds.addAll(tempMeta);
            tempMeta = new ArrayList<>();
        }
    }

    if (!tempMeta.isEmpty()){
        rdds.addAll(tempMeta);
    }
    return SparkConfig.getSparkContext().union(rdds.get(0), rdds.subList(1, rdds.size()));

然后,即使将emrfs-site配置设置为:

{
    "Classification": "emrfs-site",
    "Properties": {
      "fs.s3.consistent.retryPolicyType": "fixed",
      "fs.s3.consistent.retryPeriodSeconds": "15",
      "fs.s3.consistent.retryCount": "20",
      "fs.s3.enableServerSideEncryption": "true",
      "fs.s3.consistent": "false"
    }
}
17/02/15 19:15:41 INFO AmazonHttpClient: Unable to execute HTTP request: randomBucket.s3.amazonaws.com:443 failed to respond
org.apache.http.NoHttpResponseException: randomBucket.s3.amazonaws.com:443 failed to respond

顺便说一句,我在hdfs中生成了数百万个小文件,并尝试了同样的工作,在一个小时内就完成了。这让我觉得它是s3特定的。还有,我用的是s3a,不是普通的S3。

共有1个答案

司寇善
2023-03-14

如果您使用的是amazon EMR,那么您需要使用s3://URL;s3a://s是用于ASF版本的。

一个大问题是在s3中列出目录树需要多长时间,尤其是递归树遍历。spark代码假设它是一个快速的文件系统,其中列出dirs和声明文件的成本很低,而实际上每个操作需要1-4个HTTPS请求,即使在重用的HTTP/1.1连接上,这也是有害的。它可能很慢,你可以在日志中看到停顿。

这真正的伤害是,它是前面的分区,在那里发生了很多延迟,所以这是序列化的工作被带到膝盖上。

 类似资料:
  • 本文向大家介绍利用python如何处理百万条数据(适用java新手),包括了利用python如何处理百万条数据(适用java新手)的使用技巧和注意事项,需要的朋友参考一下 1、前言 因为负责基础服务,经常需要处理一些数据,但是大多时候采用awk以及java程序即可,但是这次突然有百万级数据需要处理,通过awk无法进行匹配,然后我又采用java来处理,文件一分为8同时开启8个线程并发处理,但是依然处

  • 我有以下用例: 当我的服务启动时,它可能需要在尽可能短的时间内处理数百万个文档。将有三个数据来源。 我已设置以下内容: 我的每个源调用consume都使用Guice来创建一个单例破坏者。 我的eventHandler例程是 我在日志中看到,制作人(

  • 需要使用到内存进行排序,但是短时间内排序又会导致内存益处

  • 用户勾选某一个分类,一次性查询该分类里的所有商品信息,并创建价格任务。 业务员在价格操作的时候,对刚才创建的所有商品信息进行价格操作,根据基准价做涨幅,每个商品都有不同的基准价,现在的问题就是数据量太大,如果一次性操作一万条商品数据,还要根据基准价做涨幅,有什么好的方法?前端如果展示的话,每条商品的涨幅后价格都要显示,请问该怎么操作比较好,后端这边应该怎么进行处理大批量的数据?

  • 在Amazon S3 bucket中,事件日志以CSV文件的形式每小时发送一次。我想执行一些简短的描述性分析1周价值的数据,每周(例如168个文件每周)。分析的重点是输出每周的趋势产品列表。我在本地机器上编写了一个python脚本,它使用boto3从S3检索最新的168个文件,并进行所有必要的争论等。 以下是从本地计算机获取168个文件的代码: 所以,我的问题是,我是否可以将这段代码放入一个lam

  • 微软Azure文档中没有提到这一点。正式的批量执行器文档只讨论插入和更新选项,而不是删除。有一个建议的java脚本服务器端程序来创建一个存储过程,听起来很不错,但这需要我们输入分区键值。如果我们的文档分布在数百万个逻辑分区上,那就没有意义了。 这是一个非常简单的业务需求。在迁移sql api cosmos集合中的大量数据时,如果我们插入了一些错误的数据,似乎没有选择删除其他数据然后恢复到以前的状态