当前位置: 首页 > 面试题库 >

什么是引起Shuffle的Spark转换?

闾丘文昌
2023-03-14
问题内容

我很难在Spark文档中找到导致随机播放的操作,而不会导致随机播放的操作。在此列表中,哪些会导致洗牌,哪些不会导致洗牌?

地图和过滤器没有。但是,我不确定其他人。

map(func)
filter(func)
flatMap(func)
mapPartitions(func)
mapPartitionsWithIndex(func)
sample(withReplacement, fraction, seed)
union(otherDataset)
intersection(otherDataset)
distinct([numTasks]))
groupByKey([numTasks])
reduceByKey(func, [numTasks])
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
sortByKey([ascending], [numTasks])
join(otherDataset, [numTasks])
cogroup(otherDataset, [numTasks])
cartesian(otherDataset)
pipe(command, [envVars])
coalesce(numPartitions)

问题答案:

没有文档,实际上很容易发现这一点。对于这些功能中的任何一个,只需创建一个RDD并调用调试字符串,这是一个示例,您可以自己完成其余的工作。

scala> val a  = sc.parallelize(Array(1,2,3)).distinct
scala> a.toDebugString
MappedRDD[5] at distinct at <console>:12 (1 partitions)
  MapPartitionsRDD[4] at distinct at <console>:12 (1 partitions)
    **ShuffledRDD[3] at distinct at <console>:12 (1 partitions)**
      MapPartitionsRDD[2] at distinct at <console>:12 (1 partitions)
        MappedRDD[1] at distinct at <console>:12 (1 partitions)
          ParallelCollectionRDD[0] at parallelize at <console>:12 (1 partitions)

如您所见,distinct创建了一个洗牌。发现这种方式而不是docs尤其重要,因为在某些情况下某些功能将需要或不需要改组。例如,join通常需要进行改组,但是如果您将两个RDD合并,则来自同一RDD
Spark的分支有时可能会取消改组。



 类似资料:
  • 问题内容: 关于Java的InterruptedException有一些有趣的问题和答案,例如Java中的InterruptedException 的原因和处理InterruptedException。但是,它们都没有告诉我InterruptedException的可能来源。 像SIGTERM,SIGQUIT,SIGINT这样的OS信号呢?在命令行上按CTRL-C是否会产生InterruptedE

  • 定义如下: RDD是不可变的分布式对象集合 我不太明白这是什么意思。它像存储在硬盘上的数据(分区对象)吗?如果是这样,那么RDD为什么可以有用户定义的类(如java、scala或python) 通过此链接:https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch03.html它提到: 用户通过两种方

  • 从评论来看: 有人能解释一下它是如何有效的吗?仅仅是它避免拳击的事实就足够了吗?

  • 问题内容: Elasticsearch中的索引是什么?一个应用程序有多个索引还是只有一个索引?假设您为某些汽车制造商构建了一个系统。它涉及人员,汽车,零件等。您是否有一个名为制造商的索引,或者您有一个人的索引,一个用于汽车的索引和一个用于零备件的索引?有人可以解释吗? 问题答案: 很好的问题,答案比人们期望的要细腻得多。您可以将索引用于几种不同的目的。 关系指标 最简单,最熟悉的布局将克隆您从关系