当前位置: 首页 > 知识库问答 >
问题:

Java Spark 广播并加入两个 RDD

张淳
2023-03-14

我有一个大表<code>JavaPairRDD

// I ignored the parsing part, just simplified it as loading from the files. 
JavaPairRDD<String, MySchema> RDD1 = sc.textFile ("path_to_small_dataset");
JavaPairRDD<String, Double> RDD2 = sc.textFile("path_to_large_dataset"); 

// Broadcast RDD2
Set<Tuple2<String, Double>> set2 = new HashSet<>();
set2.addAll(RDD2.collect());

// now I have set2 and RDD1, how can I join them? 

共有1个答案

白侯林
2023-03-14

假设您有两个要加入的RDD,第一个RDD足够小,可以容纳每个工作线程的内存(smallRDD),第二个RDD根本不需要重新排序(largeRDD)。

在加入之前,您必须确保将大型RDD[T]转换为RDD[(key,T)]。键表示联接操作期间使用的列。

这段代码应该可以在Scala中实现(但基本原理在Java中是一样的)

val smallLookup = sc.broadcast(smallRDD.collect.toMap)
largeRDD.flatMap { case(key, value) =>
  smallLookup.value.get(key).map { otherValue =>
  (key, (value, otherValue))
 }
}

我希望它有所帮助

 类似资料:
  • 问题内容: 我在加入熊猫方面遇到问题,并且试图找出问题所在。假设我有一个x: 我应该能够通过简单的连接命令在y = x上将y与索引上的y联接,除了同名具有+2。 我希望决赛对双方都有1941个非值。我也尝试过合并,但是我有同样的问题。 我以为正确的答案是pandas.concat([x,y]),但这也不符合我的预期。 编辑:如果您在加入方面遇到问题,请阅读下面的韦斯答案。我有一个重复的时间戳。 问

  • 我有一张小桌子(2k)的记录和一张大桌子(5 mil)的记录。我需要从小表中获取所有数据,只从大表中获取匹配数据,因此我在下面执行了查询

  • 输出如下: 如果两个数组的维数不相同,则元素到元素的操作是不可能的。 然而,在 NumPy 中仍然可以对形状不相似的数组进行操作,因为它拥有广播功能。 较小的数组会广播到较大数组的大小,以便使它们的形状可兼容。 如果满足以下规则,可以进行广播: 如果输入在每个维度中的大小与输出大小匹配,或其值正好为 1,则在计算中可它。 如果上述规则产生有效结果,并且满足以下条件之一,那么数组被称为可广播的。 数

  • 原文:Broadcasting 另见:numpy.broadcast 术语广播描述了NumPy在算术运算时如何处理不同形状的数组。 在某些条件下,较小的数组“广播”成较大的数组以便有相同的形状。 广播提供了一种矢量化操作数组的方法,这样可以在C而不是Python中进行循环。 它可以在不制作不必要的数据副本的情况下实现这一点,并且通常可以实现高效 然而,有些情况下广播是一个坏主意,因为它会导致内存使

  • 问题内容: 我有两个实体类Category和Events。我需要连接两个表并获取与给定条件匹配的所有记录 我的SQL查询 我怎样才能将此SQL查询转换为hql并获取数据? 我在下面尝试了但没有得到结果?hibernate很新 用于hibernate映射的事件实体类 类别实体 提取类别方法 问题答案: 使用ORM工具时,您需要考虑Java对象。 根据您的问题,我认为您要编写的查询看起来像: 使用OR

  • 问题内容: 我有一堆添加到的生产者线程和一个接收对象的工作线程。现在,我想扩展它,以使两个工作线程可以接收对象,但是对对象执行不同的工作。这是一个转折: 我希望 两个 接收线程都处理已放在队列中的对象。 如果我继续使用BlockingQueue,则两个线程将争用对象,只有一个工作线程将获取对象。 因此,我正在寻找类似于BlockingQueue的东西,但是具有广播行为。 应用程序:生产者线程实际上