当前位置: 首页 > 知识库问答 >
问题:

按列进行火花重新分区,每个列的分区数是动态的

邴子实
2023-03-14

如何根据列中项数的计数来分区DataFrame。假设我们有一个包含100人的DataFrame(列是first_namecountry),我们希望为一个国家中的每10个人创建一个分区。

如果我们的数据集包含来自中国的80人,来自法国的15人,来自古巴的5人,那么我们需要8个分区用于中国,2个分区用于法国,1个分区用于古巴。

下面是无法工作的代码:

    null

有什么方法可以动态设置每个列的分区数吗?这将使创建分区数据集变得更加容易。

共有1个答案

翟丰茂
2023-03-14

由于spark对数据进行分区的方式,您将无法准确地实现这一目标。Spark获取在重新分区中指定的列,将该值散列到64B的长度中,然后对该值进行分区数的模运算。这样,分区的数量是确定的。它以这种方式工作的原因是,除了确保两边的散列相同之外,联接还需要在联接的左侧和右侧匹配数量的分区。

“我们希望在一个国家为每10个人创建一个分区。”

你到底想在这里完成什么?在一个分区中只有10行可能会对性能造成不良影响。您是否试图创建一个分区表,其中分区中的每个文件都被保证只有x行数?

  def repartition(partitionExprs: Column*): Dataset[T] = {
    repartition(sparkSession.sessionState.conf.numShufflePartitions, partitionExprs: _*)
  }

“df.repartition(8,$”country“,rand):这将为每个国家创建最多8个分区,因此它应该为中国创建8个分区,但法国和古巴的分区未知。法国可能在8个分区中,古巴可能在5个分区中。有关详细信息,请参阅此答案。”

就像明智一样,这是微妙的错误。只有8个分区,国家基本上在这8个分区中随机洗牌。

 类似资料:
  • 我有一个大的csv文件,其中包含以下格式的数据。 CityId1,名称,地址,........., zip 城市2、姓名、地址等,。。。。。。。,拉链 CityId1,名称,地址,........., zip ......... 城市名称、姓名、地址等,。。。。。。。,拉链 我正在对上面的csv文件执行以下操作: > df1。groupBy($“cityId”)。agg(收集列表(结构(cols.

  • 我通过指定分区的数量从文本文件创建RDD(Spark 1.6)。但它给我的分区数与指定的分区数不同。 案例1 案例2 案例3 案例4 文件/home/pvikash/data/test的内容。txt是: 这是一个测试文件。将用于rdd分区 基于以上案例,我有几个问题。 对于案例2,显式指定的分区数为0,但实际分区数为1(即使默认最小分区为2),为什么实际分区数为1? 对于案例3,为什么在指定数量的

  • 我们开始在团队中尝试spark。在我们减少spark中的工作后,我们希望将结果写入S3,但是我们希望避免收集Spark结果。目前,我们正在为RDD的每个分区写文件,但是这会产生很多小文件。我们希望能够将数据聚合到几个文件中,这些文件按照写入文件的对象数量进行分区。例如,我们的总数据是100万个对象(这是不变的),我们希望生成40万个对象文件,而我们当前的分区生成大约2万个对象文件(这因每个作业而异

  • 我在任何地方都找不到如何在RDD内部执行重新分区?我知道您可以在RDD上调用重新分区方法来增加分区数量,但它是如何在内部执行的呢? 假设,最初有5个分区,他们有- 第一个分区 - 100 个元素 第二个分区 - 200 个元素 第 3 个分区 - 500 个元素 第 4 个分区 - 5000 个元素 第 5 分区 - 200 个元素 一些分区是倾斜的,因为它们是从HBase加载的,并且数据没有正确

  • 我需要从一个Hive表中读取数据并将其插入到另一个Hive表中。两个表的架构是相同的。该表按日期分区 步骤1:从Spark的源表中读取数据。 第 2 步:按列(国家、日期)和分区数重新分区为 4。 我只得到每个国家代码1个分区