当前位置: 首页 > 知识库问答 >
问题:

优化从s3 bucket中分区拼花文件的读取

毛宏达
2023-03-14

我有一个拼花格式的大数据集(大小约1TB),分为2个层次:类和日期,只有7个类。但从2020年1月1日起,这一日期不断增加。我的数据首先按进行分区,然后按日期

比如:

CLASS1---DATE 1
      ---DATE 2
      ---  .
      ---  .
      ---  .
      ---DATE N

CLASS2---DATE 1
      ---DATE 2
      ---  .
      ---  .
      ---  .
      ---DATE N

我在for循环中按类加载数据。如果我加载整个拼花地板文件,由于它重载了内存实例,所以纱线会终止作业。但自从我在建模中进行百分位计算以来,我每天都在加载。此方法大约需要23小时才能完成。

但是,如果我重新分区以使我只有CLASS分区,则作业需要大约10小时。有太多的子分区会减慢火花执行器作业的速度吗?我将分区层次结构保留为CLASS-

非常感谢。

编辑:我在文件结构上使用for循环按分区循环,如下所示:

fs = s3fs.S3FileSystem(anon=False)    
inpath="s3://bucket/file.parquet/"

Dirs= fs.ls(inpath)
for paths in Dirs:
    customPath='s3://' + uvapath + '/'
    class=uvapath.split('=')[1]
    df=spark.read.parquet(customPath)
    outpath="s3://bucket/Output_" + class + ".parquet"
#Perform calculations
df.write.mode('overwrite').parquet(outpath)

加载的df将包含CLASS=1的所有日期。然后,我将该文件作为每个类的单独拼花文件输出,这样我就有7个拼花文件:

Output_1.parquet
Output_2.parquet
Output_3.parquet
Output_4.parquet
Output_5.parquet
Output_6.parquet
Output_7.parquet

然后,我将7个拼花合并到一个拼花中并不是问题,因为生成的拼花文件要小得多。


共有1个答案

乐正嘉瑞
2023-03-14

我有三个列的分区数据,年份、月份和id。文件夹路径层次结构是

year=2020/month=08/id=1/*.parquet
year=2020/month=08/id=2/*.parquet
year=2020/month=08/id=3/*.parquet
...
year=2020/month=09/id=1/*.parquet
year=2020/month=09/id=2/*.parquet
year=2020/month=09/id=3/*.parquet

我可以通过加载根路径来读取数据帧。

val df = spark.read.parquet("s3://mybucket/")

然后,分区列会自动添加到DataFrame中。现在,您可以通过以下方式过滤分区列的数据

val df_filtered = df.filter("year = '2020' and month = '09'")

并使用df_filtered做一些事情,那么火花将仅使用分区数据!

对于重复处理,可以使用spark的公平调度程序。添加展会。使用以下代码将xml文件转换为项目的src/main/resources,

<?xml version="1.0"?>

<allocations>
    <pool name="fair">
        <schedulingMode>FAIR</schedulingMode>
        <weight>10</weight>
        <minShare>0</minShare>
    </pool>
</allocations>

并在创建spark会话后设置spark配置。

spark.sparkContext.setLocalProperty("spark.scheduler.mode", "FAIR")
spark.sparkContext.setLocalProperty("spark.scheduler.allocation.file", getClass.getResource("/fair.xml").getPath)
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "fair")

然后您可以并行完成您的工作。您可能希望并行化作业取决于类,因此

val classes = (1 to 7).par
val date = '2020-09-25'

classes foreach { case i =>

    val df_filtered = df.filter(s"CLASS == '$i' and DATE = '$date'")
    
    // Do your job

}

代码将使用不同的类值同时工作。

 类似资料:
  • 如何读取带有条件作为数据帧的分区镶木地板, 这工作得很好, 分区存在的时间为< code>day=1到day=30是否可能读取类似于< code>(day = 5到6)或< code>day=5,day=6的内容, 如果我输入< code>*,它会给出所有30天的数据,而且太大了。

  • 我们正在寻找一种解决方案,以便创建一个外部配置单元表,根据parquet/avro模式从parquet文件中读取数据。 换句话说,如何从拼花/avro模式生成hive表? 谢谢:)

  • 我需要从不是父目录或子目录的多个路径读取拼花地板文件。 例如, 从dir1\u 1和dir1\u 2读取拼花文件 现在,我正在读取每个目录并使用“unionAll”合并数据帧。有没有一种方法可以不使用unionAll从dir1\u 2和dir2\u 1读取拼花地板文件,或者有没有什么奇特的方法可以使用unionAll 谢谢

  • 则错误如下: AttributeError:“property”对象没有属性“parquet”

  • 我已经使用Spark生成了一些分区拼花地板数据,我想知道如何将其映射到Impala表。。。遗憾的是,我还没有找到任何解决办法。 拼花地板的架构如下: 我用和对其进行了分区,这为我的hdfs提供了这种目录: 您知道我如何告诉Impala从这个数据集创建一个具有相应分区的表(并且不必像我读到的那样在每个分区上循环)?有可能吗? 提前谢谢你

  • 我试图利用火花分区。我试图做这样的事情 这里的问题每个分区都会创建大量的镶木地板文件,如果我尝试从根目录读取,则会导致读取缓慢。 为了避免这种情况,我试过 但是,这将创建每个分区中镶木地板文件的数目。现在我的分区大小不同了。因此,理想情况下,我希望每个分区都有单独的合并。然而,这看起来并不容易。我需要访问所有分区合并到一定数量并存储在单独的位置。 我应该如何使用分区来避免写入后出现许多文件?