当前位置: 首页 > 知识库问答 >
问题:

Spark中分组对RDD的最优划分数

关志
2023-03-14

我有两对结构为rdd[String,Int]的RDD,称为rdd1和rdd2。

val lvl1rdd=rdd1.groupByKey()
val lvl2rdd=rdd2.groupByKey()
val lvl1_lvl2=lvl1rdd.join(lvl2rdd)
val finalrdd=lvl1_lvl2.mapValues(value => function(value))

如果我加入前面的RDD,并在结果RDD(mapValues)的值上执行一个函数,那么所有的工作都将在一个worker中完成,而不是在集群的不同worker节点上分配不同的任务。我的意思是,期望的行为应该是在集群允许的这么多节点中并行执行作为参数传递给mapValues方法的函数。

共有1个答案

公羊凌
2023-03-14

1)避免使用groupByKey操作,因为它们是网络I/O和执行性能的瓶颈,在这种情况下使用reduceByKey操作,因为数据洗牌比groupByKey要少,如果是一个较大的数据集,我们可以更好地看到差异。

val lvl1rdd = rdd1.reduceByKey(x => function(x)) 
val lvl1rdd = rdd2.reduceByKey(x => function(x))
//perform the Join Operation on these resultant RDD's

在RDD上单独应用函数并将它们连接起来,远比使用groupByKey()连接RDD并应用函数要好得多

这也将确保任务分布在不同的执行器之间并并行执行

partition = key.hashCode() % numPartitions

这将创建固定数量的分区,当您使用groupByKey操作时,它可以超过intial数量。我们还可以自定义要创建的分区。例如

val result_rdd = rdd1.partitionBy(new HashPartitioner(2))

这将创建2个分区,通过这种方式,我们可以设置分区的数量。要决定最佳分区数,请参阅以下答案https://stackoverflow.com/a/40866286/7449292

 类似资料:
  • 我有两个RDDs。在Spark scala中,如果event1001RDD和event2009RDD具有相同的id,我该如何连接它们? Val事件1001RDD:模式RDD=[事件类型,id,位置,日期1] val event 2009 rdd:schemaRDD =[事件类型,id,日期1,日期2] 预期结果将是:(唯一)(按 id 排序) [事件类型,ID,1001 的位置,1001 的日期1

  • 假设我希望根据的对其进行分区。 通过覆盖方法对进行分区,并且只使用的hashcode是否正确? 但是,鉴于接受了许多分区参数,我不知道是否需要事先知道种类的数量,如果种类多于分区,会发生什么? 我的目标是打电话 并且在迭代器中只有具有相同的值。

  • 因此,如何跨辅助节点对RDD进行分区,是将被分区的单个RDD还是一个完整的批处理。 我可能拿错了。请指引我

  • 假设我创建了这样一个RDD(我使用的是Pyspark): 然后我用方法打印分区元素并获得 Spark是如何决定如何划分我的列表的?元素的特定选择来自哪里?它可以以不同的方式耦合它们,只留下0和10以外的一些其他元素,以创建6个请求的分区。在第二次运行中,分区是相同的。 使用更大的范围,有29个元素,我得到2个元素后跟3个元素的模式的分区: 使用更小范围的9个元素,我得到 因此,我推断Spark是通

  • 在Spark流式传输中,是否可以将特定的RDD分区分配给集群中的特定节点(为了数据局部性?) 例如,我得到一个事件流[a,a,a,b,b],并有一个2节点的Spark集群。 我希望所有的a总是去节点1,所有的b总是去节点2。 谢啦!

  • 问题内容: 我对Apache Spark和Python比较陌生,想知道像我将要描述的东西是否可行? 我有一个格式为[m 1,m 2,m 3,m 4,m 5,m 6, … m n ]的RDD(运行rdd.collect()时会得到这个)。我想知道是否有可能将此RDD转换为[[m 1,m 2,m 3),(m 4,m 5,m 6).....(m n-2, m n-1,m n)]。内部元组的大小应为k。如