当前位置: 首页 > 面试题库 >

Spark:将2元组键RDD与单键RDD结合在一起的最佳策略是什么?

於永寿
2023-03-14
问题内容

我要加入两个RDD,它们看起来像这样:

val rdd1:RDD[(T,U)]
val rdd2:RDD[((T,W), V)]

碰巧的情况是的键值rdd1是唯一的,并且的元组键值rdd2也是唯一的。我想加入两个数据集,以便获得以下rdd:

val rdd_joined:RDD[((T,W), (U,V))]

实现此目的的最有效方法是什么?这是我想到的一些想法。

选项1:

val m = rdd1.collectAsMap
val rdd_joined = rdd2.map({case ((t,w), u) => ((t,w), u, m.get(t))})

选项2:

val distinct_w = rdd2.map({case ((t,w), u) => w}).distinct
val rdd_joined = rdd1.cartesian(distinct_w).join(rdd2)

选项1将收集所有数据以掌握,对吗?因此,如果rdd1很大(在我的情况下它相对较大,虽然比rdd2小一个数量级),但这似乎不是一个好选择。选项2做得很丑陋,而且笛卡尔积,看来效率也很低。我不曾想到(但尚未尝试过)的另一种可能性是执行选项1并广播地图,尽管最好以“智能”方式进行广播,以使地图的按键与菜单相同。的键rdd2。

有人遇到过这种情况吗?有您的想法我将很高兴。

谢谢!


问题答案:

一种选择是通过rdd1向驾驶员收集并将其广播给所有映射器来执行广播联接。如果正确完成,这将使我们避免大型rdd2RDD的昂贵改组:

val rdd1 = sc.parallelize(Seq((1, "A"), (2, "B"), (3, "C")))
val rdd2 = sc.parallelize(Seq(((1, "Z"), 111), ((1, "ZZ"), 111), ((2, "Y"), 222), ((3, "X"), 333)))

val rdd1Broadcast = sc.broadcast(rdd1.collectAsMap())
val joined = rdd2.mapPartitions({ iter =>
  val m = rdd1Broadcast.value
  for {
    ((t, w), u) <- iter
    if m.contains(t)
  } yield ((t, w), (u, m.get(t).get))
}, preservesPartitioning = true)

该preservesPartitioning = true告诉星火此映射函数不修改的键rdd2; 这样,Spark可以避免rdd2对基于该(t, w)密钥加入的任何后续操作进行重新分区。

由于广播涉及驾驶员的通信瓶颈,因此广播效率可能很低。原则上,可以在不涉及驱动程序的情况下将一个RDD广播到另一个。我有一个原型,希望对此进行概括并添加到Spark中。

另一种选择是重新映射的键rdd2并使用Sparkjoin方法。这将涉及rdd2(可能rdd1)的全部改组:

rdd1.join(rdd2.map {
  case ((t, w), u) => (t, (w, u))
}).map {
  case (t, (v, (w, u))) => ((t, w), (u, v))
}.collect()

在我的示例输入中,这两种方法都产生相同的结果:

res1: Array[((Int, java.lang.String), (Int, java.lang.String))] = Array(((1,Z),(111,A)), ((1,ZZ),(111,A)), ((2,Y),(222,B)), ((3,X),(333,C)))

第三种选择是重组rdd2,t使其成为关键,然后执行上述连接。



 类似资料:
  • 定义如下: RDD是不可变的分布式对象集合 我不太明白这是什么意思。它像存储在硬盘上的数据(分区对象)吗?如果是这样,那么RDD为什么可以有用户定义的类(如java、scala或python) 通过此链接:https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch03.html它提到: 用户通过两种方

  • 假设我希望根据的对其进行分区。 通过覆盖方法对进行分区,并且只使用的hashcode是否正确? 但是,鉴于接受了许多分区参数,我不知道是否需要事先知道种类的数量,如果种类多于分区,会发生什么? 我的目标是打电话 并且在迭代器中只有具有相同的值。

  • 我有一个Spark RDD,其中每个元素都是形式的元组。我想使用方法将输入传递给外部可执行文件并生成形式的新RDD。我稍后需要键进行关联。 下面是使用火花壳的示例: 提前谢谢。

  • 我尝试创建一个JavaRDD,其中包含另一系列RDD。 RDD机器。foreach(机器- 第一:有没有可能这样做?如果没有,我可以用什么方式尝试做一些不同的事情? 让我展示一下我尝试做的事情: 我尝试在每台机器上启动我的算法,这台机器必须从Elasticsearch中的数据中学习。 因此,我尝试在每个“机器”中获取查询的所有数据。我的问题是:Spark有可能做到这一点吗?或者以其他方式?当我点燃

  • 主要内容:1.RDD特点:,2.RDD的 5大属性,3.RDD的执行原理,4.Spark的核心组件1.RDD特点: 可变: 存储的弹性 容错的弹性 计算的弹性 分片的弹性 RDD 代码中是一个抽象类, 代表弹性的, 不可变, 可分区, 里面的元素可并行计算的集合, 为弹性分布式数据集。 RDD 不保存数据, 但是有血缘关系。 不可变的是逻辑, 如果想加入新的逻辑, 必须封装。 2.RDD的 5大属性 分区列表 分区计算函数 多个RDD有依赖关系 分区器: 一个分区的规则, 和Kafka 类似

  • val celllookuprdd:[celltype,cellname](cellname有4个值) 预期结果:[id,1001的位置,1001的日期1,2009年的第一个转数日期,2009年的最后一个转数日期,2009年的第一个PPM日期,2009年的最后一个PPM日期] 这是我当前的查询(在这里,我还指示了一个可选的eventtype作为第一列;但是在我以前的event2009RDD中,我选