当前位置: 首页 > 知识库问答 >
问题:

(火花偏斜连接)如何在没有内存问题的情况下加入两个具有高度重复密钥的大型Spark RDD?

柴飞扬
2023-03-14

在前面的问题中,我试图通过避免使用加入来避免Spark加入的内存问题。

在这个新问题中,我使用join,但试图解决内存问题。

这是我的两个rdd:

> < Li > < p > productToCustomerRDD:< br > Size:非常大,可能有数百万个不同的键< br >使用< code>HashPartitioner在键上分区< br >有些键会高度重复,有些则不会。

(toast, John)
(butter, John)
(toast, Jane)
(jelly, Jane)

productToCountRDD:
大小:非常大,可能有数百万个不同的键,大到无法使用< code>HashPartitioner对键进行< code >广播
分区< code >键是唯一的,值是购买该产品的客户数量。

(toast, 2)
(butter, 1)
(jelly, 1)

我想加入这两个RDD,结果将是:

客户到产品并计数RDD:

(toast, (John, 2))
(butter, (John, 1))
(toast, (Jane, 2))
(jelly, (Jane, 1))

如果我使用<code>productToCustomerRDD连接两个RDD。join(productToCountRDD)我在两个分区(数千个分区中)上得到一个OutOfMemoryError。在Spark UI中,我注意到在包含<code>join</code>的阶段,在<code>Input Size/Records</code>列中,所有分区都有4K到700K的记录。除了产生OOM的两个分区之外,所有分区都有:一个有9M条记录,一个有6M条记录。

按照我的理解,为了加入,具有相同键的对需要被洗牌并移动到相同的分区(除非它们之前是按键分区的)。但是,由于一些键非常频繁(例如:数据集中几乎每个客户都购买的产品),大量数据可能会在< code>join期间或在join之前的< code>repartition期间移动到一个分区。

我的理解正确吗
有办法避免这种情况吗
有没有一种方法可以加入,而不需要在同一分区上拥有一个严重重复的键的所有数据?

共有2个答案

夏侯俊美
2023-03-14

我的第一个问题是:你真的需要这些详细的数据吗?你真的需要知道jhon买了2只蟾蜍等等吗?我们处于大数据环境中,我们处理大量数据,因此有时聚合是减少基数并在分析和性能方面获得良好结果的一件好事。因此,如果你想知道一个产品售出了多少次,你可以使用一个pairRDD(product,count)[这样一来,每个产品就有一个元素],或者如果你想了解用户的偏好,你可以用一个pair RDD(user,购买产品的列表)[这样,每个用户就有一种元素]。如果你真的需要知道吐司是从Jhon那里买的,为什么要把吐司键拆分成不同的重新分区?这样,您就无法计算全局结果,因为在每个块中,您只有一条关于键的信息。

慎峻
2023-03-14

实际上,这是Spark中的一个标准问题,称为“歪斜连接”:连接的一侧是歪斜的,这意味着它的一些键比其他键更频繁。这里可以找到一些对我没用的答案。

我使用的策略受到此处定义的 GraphFrame.skewedJoin() 方法及其在 ConnectedComponents.skewedJoin() 中的使用启发。将通过使用广播联接连接最常用的键和使用标准联接连接频率较低的键来执行联接。

在我的示例(OP)中,productToCountRDD已经包含有关密钥频率的信息。所以它是这样的:

  • 过滤productToCountRDD以仅保留高于固定阈值的计数,并对驱动程序进行
  • 将此地图广播给所有执行者。
  • productTo搬到客户端RDD拆分成两个RDD:在广播映射中找到的密钥(频繁密钥)和不在广播映射中的密钥(不频繁密钥)。
  • 频繁键的连接使用mapToPair执行,从广播映射中获取计数
  • 不频繁键的连接使用加入执行。
  • 最后使用联合来获取完整的RDD。

 类似资料:
  • 问题内容: 假定派生自以下类的类: 如果我想在两个初始化器中都使用相同的代码,例如 并且 不要 在类实现中 重复 两次 该代码 ,我将如何构造方法? 尝试的方法: 创建一个在-> Swift编译器在调用之前给出有关未初始化变量的错误之后调用的方法 之前的调用显然失败,并出现编译器错误 “ super.init调用之前使用了’self’ 问题答案: 正如GoZoner所说,将变量标记为可选将起作用。

  • 如何在Azure密钥库中设置秘密,而不使用PowerShell。我们正在使用Azure Key Vault来安全地存储连接字符串和一些其他应用程序秘密。我们可以使用PowerShell脚本添加秘密,但我想知道是否有其他方法可以在Azure KeyVault中添加密钥,最好是使用API。我们实际上需要提供一个管理工具,应用程序管理员可以使用该工具在密钥库中添加/修改机密。

  • 这两个dataframe没有任何公共列。两个数据流中的行数和列数也不同。我尝试插入一个新的虚拟列,以增加row_index值,如下val dfr=df1.withcolumn(“row_index”,monotonically_increasing_id())所示。 但由于我使用的是Spark2,因此不支持monotonically_increasing_id方法。有没有办法把两个datafram

  • 问题内容: 示例问题: 实体: 用户包含姓名和朋友列表(用户参考) 博客文章包含标题,内容,日期和作者(用户) 需求: 我想要一个显示标题的页面以及指向用户朋友的最近10篇博客的链接。我还希望能够通过较旧的条目继续进行分页。 SQL解决方案: 因此在sql land中,它将类似于: 我能想到的GAE解决方案是: 加载用户,循环浏览好友列表并加载其最新博客帖子。最后合并所有博客文章以查找最新的10个

  • 我有两个大的Hive表,我想用spark.sql将它们连接起来。表格采用snappy格式,在Hive中存储为拼花文件。 我想加入它们并对某些列进行一些聚合,假设计算所有行和一列的平均值(例如 doubleColumn),同时使用两个条件进行过滤(假设在 col1,col2 上)。 注意:我在一台机器上进行测试安装(虽然功能非常强大)。我希望集群中的性能可能会有所不同。 我的第一个尝试是使用spar

  • 在我的响应中,有一个,没有任何。那么,我如何获取该响应呢? 这是我的JSON代码-