当前位置: 首页 > 面试题库 >

从S3并行读取多个文件(Spark,Java)

西门飞星
2023-03-14
问题内容

我对此进行了一些讨论,但还不太了解正确的解决方案:我想将S3中的数百个文件加载到RDD中。这是我现在的做法:

ObjectListing objectListing = s3.listObjects(new ListObjectsRequest().
                withBucketName(...).
                withPrefix(...));
List<String> keys = new LinkedList<>();
objectListing.getObjectSummaries().forEach(summery -> keys.add(summery.getKey())); // repeat while objectListing.isTruncated()

JavaRDD<String> events = sc.parallelize(keys).flatMap(new ReadFromS3Function(clusterProps));

ReadFromS3Function不使用实际的阅读AmazonS3客户端:

    public Iterator<String> call(String s) throws Exception {
        AmazonS3 s3Client = getAmazonS3Client(properties);
        S3Object object = s3Client.getObject(new GetObjectRequest(...));
        InputStream is = object.getObjectContent();
        List<String> lines = new LinkedList<>();
        String str;
        try {
            BufferedReader reader = new BufferedReader(new InputStreamReader(is));
            if (is != null) {
                while ((str = reader.readLine()) != null) {
                    lines.add(str);
                }
            } else {
                ...
            }
        } finally {
            ...
        }
        return lines.iterator();

我从在Scala中针对相同问题看到的答案中“翻译”了一下。我认为也可以将整个路径列表传递给sc.textFile(...),但是我不确定哪种是最佳做法。


问题答案:

根本的问题是,在s3中列出对象的速度确实很慢,并且每当执行树遍历时,看起来像目录树的方式都会降低性能(就像路径的通配符模式处理一样)。

帖子中的代码正在列出所有子对象,这些方法可提供更好的性能,本质上是Hadoop 2.8和s3a
listFiles(路径,递归)附带的内容,请参见HADOOP-13208。

获得该清单后,您将获得对象路径的字符串,然后可以将其映射到s3a / s3n路径以将spark用作文本文件输入,然后将其应用于

val files = keys.map(key -> s"s3a://$bucket/$key").mkString(",")
sc.textFile(files).map(...)

并按要求提供以下Java代码。

String prefix = "s3a://" + properties.get("s3.source.bucket") + "/";
objectListing.getObjectSummaries().forEach(summary -> keys.add(prefix+summary.getKey())); 
// repeat while objectListing truncated 
JavaRDD<String> events = sc.textFile(String.join(",", keys))

请注意,我已将s3n切换为s3a,因为只要在CP上具有hadoop-awsamazon- sdkJAR,s3a连接器就是您应该使用的连接器。更好,它是一种针对人员(我)针对火花工作负载进行维护和测试的工具。请参阅Hadoop
S3连接器的历史



 类似资料:
  • 问题内容: 我在S3中有一个csv文件,我正在尝试读取标题行以获取大小(这些文件是由我们的用户创建的,因此它们几乎可以是任何大小)。有没有办法使用boto做到这一点?我以为也许我们可以使用python BufferedReader,但是我不知道如何从S3键打开流。任何建议都很好。谢谢! 问题答案: 看来boto具有可以执行此操作的功能。这是一些对我有用的代码: 调用会从对象返回下一个n个字节。 当

  • 我试图从mysql读取数据,并将其写回s3中的parquet文件,具体分区如下: 我的问题是,它只打开一个到mysql的连接(而不是4个),并且在从mysql获取所有数据之前,它不会写入parquert,因为mysql中的表很大(100M行),进程在OutOfMemory上失败。 有没有办法将Spark配置为打开多个到mysql的连接并将部分数据写入镶木地板?

  • 问题内容: 我想从S3读取多个实木复合地板文件到一个数据帧中。目前,我正在使用以下方法执行此操作: 如果所有文件都存在于S3上,则此方法有效,但是当列表中的某些文件不存在时,我想请求将文件列表加载到数据帧中而不会中断。换句话说,我希望sparkSql可以将它找到的尽可能多的文件加载到数据帧中,并在不抱怨的情况下返回此结果。这可能吗? 问题答案: 是的,如果您将指定输入的方法更改为hadoop全局模

  • 问题内容: 我有一个文件,我想用Java读取并将其拆分为(用户输入)输出文件。这是我读取文件的方式: 如何将文件拆分为文件? 注意-由于文件中的条目数约为100k,因此我无法将文件内容存储到数组中,然后将其拆分并保存到多个文件中。 问题答案: 由于一个文件可能很大,因此每个拆分文件也可能很大。 例: 源文件大小:5GB 数字分割:5:目的地 档案大小:每个1GB(5个档案) 即使我们有这样的内存,

  • /tmp/data/myfile1.csv,/tmp/data/myfile2.csv,/tmp/data.myfile3.csv,/tmp/datamyfile4.csv 我希望将这些文件读入Spark DataFrame或RDD,并且希望每个文件都是DataFrame的一个解析。我怎么能这么做?

  • 编写了通过Spark读取文本文件的代码...在Local中运行良好...但在HDInsight中运行时产生错误->从Blob读取文本文件 org.apache.spark.sparkException:作业由于阶段失败而中止:阶段0.0中的任务0失败了4次,最近的失败:阶段0.0中丢失的任务0.3(TID 5,wn1-hchdin.bpqkkmavxs0ehkfnaruw4ed03d.dx.int