当前位置: 首页 > 知识库问答 >
问题:

每个分区中对象数量的火花合并

西门飞翮
2023-03-14

我们开始在团队中尝试spark。在我们减少spark中的工作后,我们希望将结果写入S3,但是我们希望避免收集Spark结果。目前,我们正在为RDD的每个分区写文件,但是这会产生很多小文件。我们希望能够将html" target="_blank">数据聚合到几个文件中,这些文件按照写入文件的对象数量进行分区。例如,我们的总数据是100万个对象(这是不变的),我们希望生成40万个对象文件,而我们当前的分区生成大约2万个对象文件(这因每个作业而异)。理想情况下,我们希望生成3个文件,每个文件包含400k、400k和200k,而不是50个20K对象的文件

有人有好的建议吗?

我的思考过程是让每个分区处理它应该写入的索引,假设每个分区将大致产生相同数量的对象。例如,分区0将写入第一个文件,而分区21将写入第二个文件,因为它将假设对象的起始索引为20000*21=42000,这比文件大小大。分区41将写入第三个文件,因为它大于2*文件大小限制。然而,这并不总是导致完美的400k文件大小限制,更多的是近似值。

我知道有合并,但正如我所理解的,合并是根据想要的分区数量减少分区数量。我想要的是根据每个分区中对象的数量合并数据,有没有好的方法可以做到这一点?

共有2个答案

宋航
2023-03-14

我们已经决定继续使用生成的文件数量,并确保每个文件包含少于100万个行项目

张光辉
2023-03-14

您要做的是将文件重新分区为三个分区;数据将拆分为每个分区大约 333K 条记录。分区将是近似的,每个分区不会正好是 333,333。我不知道有什么方法可以获得您想要的 400k/400k/200k 分区。

如果您有一个DataFrame'df',您可以重新分区为n个分区

df.repartition(n)

由于您需要每个分区的最大记录数,我建议您这样做(您没有指定Scala或pyspark,所以我选择Scala;您可以在pyspard中这样做):

val maxRecordsPerPartition = ???
val numPartitions = (df.count() / maxRecordsPerPartition).toInt + 1
df
    .repartition(numPartitions)
    .write
    .format('json')
    .save('/path/file_name.json')

这将确保您的分区小于maxRecordsPerPartition。

 类似资料:
  • 我通过指定分区的数量从文本文件创建RDD(Spark 1.6)。但它给我的分区数与指定的分区数不同。 案例1 案例2 案例3 案例4 文件/home/pvikash/data/test的内容。txt是: 这是一个测试文件。将用于rdd分区 基于以上案例,我有几个问题。 对于案例2,显式指定的分区数为0,但实际分区数为1(即使默认最小分区为2),为什么实际分区数为1? 对于案例3,为什么在指定数量的

  • 我有5个表存储为CSV文件(A.CSV、B.CSV、C.CSV、D.CSV、E.CSV)。每个文件按日期分区。如果文件夹结构如下:

  • 如何根据列中项数的计数来分区DataFrame。假设我们有一个包含100人的DataFrame(列是和),我们希望为一个国家中的每10个人创建一个分区。 如果我们的数据集包含来自中国的80人,来自法国的15人,来自古巴的5人,那么我们需要8个分区用于中国,2个分区用于法国,1个分区用于古巴。 下面是无法工作的代码: null 有什么方法可以动态设置每个列的分区数吗?这将使创建分区数据集变得更加容易

  • 谁能给我解释一下吗? 然而,另一方面是,对于不能保证产生已知分区的转换,输出RDD将没有分区器集。例如,如果对哈希分区的键/值对RDD调用map(),则传递给map()的函数在理论上可以更改每个元素的键,因此结果将不会有分区器。Spark不会分析函数以检查它们是否保留密钥。相反,它提供了另外两个操作,mapValues()和flatMap Values(),它们保证每个元组的键保持不变。 Mate

  • [新加入Spark]语言-Scala 根据文档,RangePartitioner对元素进行排序并将其划分为块,然后将块分发到不同的机器。下面的例子说明了它是如何工作的。 假设我们有一个数据框,有两列,一列(比如“a”)的连续值从1到1000。还有另一个数据帧具有相同的模式,但对应的列只有4个值30、250、500、900。(可以是任意值,从1到1000中随机选择) 如果我使用RangePartit

  • 我有一个大的csv文件,其中包含以下格式的数据。 CityId1,名称,地址,........., zip 城市2、姓名、地址等,。。。。。。。,拉链 CityId1,名称,地址,........., zip ......... 城市名称、姓名、地址等,。。。。。。。,拉链 我正在对上面的csv文件执行以下操作: > df1。groupBy($“cityId”)。agg(收集列表(结构(cols.