当前位置: 首页 > 知识库问答 >
问题:

用于有效连接Spark数据帧/数据集的分区数据

贾沛
2023-03-14

我需要根据一些共享的键列将许多数据帧连接在一起。对于键值RDD,可以指定一个分区程序,以便具有相同键的数据点被洗牌到相同的执行器,因此连接更有效(如果在连接之前有与洗牌相关的操作)。可以在火花数据帧或数据集上做同样的事情吗?

共有2个答案

邢寒
2023-03-14

可以使用重新分区方法使用DataFrame/DataSet API。使用此方法,您可以指定一个或多个列用于数据分区,例如。

val df2 = df.repartition($"colA", $"colB")

也可以同时在同一命令中指定所需分区的数量,

val df2 = df.repartition(10, $"colA", $"colB")

注意:这并不能保证数据帧的分区位于同一个节点上,只是分区以相同的方式完成。

高墨一
2023-03-14

如果知道要多次加入数据帧,可以在加载数据帧后对其进行重新分区

val users = spark.read.load("/path/to/users").repartition('userId)

val joined1 = users.join(addresses, "userId")
joined1.show() // <-- 1st shuffle for repartition

val joined2 = users.join(salary, "userId")
joined2.show() // <-- skips shuffle for users since it's already been repartitioned

因此,它将对数据进行一次洗牌,然后在加入后续时间时重用洗牌文件。

然而,如果你知道你会在某些键上反复洗牌数据,最好的办法是将数据保存为带扣的表。这将把已经预哈希分区的数据写出来,因此当您读入并加入表时,可以避免混乱。你可以这样做:

// you need to pick a number of buckets that makes sense for your data
users.bucketBy(50, "userId").saveAsTable("users")
addresses.bucketBy(50, "userId").saveAsTable("addresses")

val users = spark.read.table("users")
val addresses = spark.read.table("addresses")

val joined = users.join(addresses, "userId")
joined.show() // <-- no shuffle since tables are co-partitioned

为了避免洗牌,桌子必须使用相同的扣板(例如相同数量的桶和桶柱上的连接)。

 类似资料:
  • 有人能解释一下将为Spark Dataframe创建的分区数量吗。 我知道对于RDD,在创建它时,我们可以提到如下分区的数量。 但是对于创建时的Spark数据帧,看起来我们没有像RDD那样指定分区数量的选项。 我认为唯一的可能性是,在创建数据帧后,我们可以使用重新分区API。 有人能告诉我在创建数据帧时,我们是否可以指定分区的数量。

  • 使用Spark Dataset/DataFrame联接时,我面临长时间运行且OOM作业失败的问题。 以下是输入: ~10个不同大小的数据集,大部分是巨大的( 经过一番分析,我发现作业失败和缓慢的原因是歪斜键:当左侧有数百万条记录时,用连接键。 我用了一些蛮力的方法来解决这个问题,这里我想和大家分享一下。 如果您有更好的或任何内置的解决方案(针对常规Apache Spark),请与他人分享。

  • 我需要使用 spark-sql 加载一个 Hive 表,然后对其运行一些机器学习算法。我是这样写的: 它工作得很好,但如果我想增加数据集数据帧的分区数,我该怎么做?使用普通RDD,我可以写: 我想要有N个分区。 谢谢

  • 我们有没有可能在Spark中先按一列分区,然后再按另一列聚类? 在我的例子中,我在一个有数百万行的表中有一个< code>month列和一个< code>cust_id列。我可以说,当我将数据帧保存到hive表中,以便根据月份将该表分区,并按< code>cust_id将该表聚类成50个文件吗? 忽略按< code>cust_id的聚类,这里有三个不同的选项 第一种情况和最后一种情况在 Spark

  • null null 为什么要使用UDF/UADF而不是map(假设map保留在数据集表示中)?

  • 我正在尝试使用Spark数据集API,但在进行简单连接时遇到了一些问题。 假设我有两个带有字段的数据集:,那么在的情况下,我的连接如下所示: 但是,对于数据集,有一个。joinWith方法,但相同的方法不起作用: