当前位置: 首页 > 知识库问答 >
问题:

如果我打算按键执行大量聚合,我应该选择RDD而不是数据集/数据帧吗?

盖和洽
2023-03-14

我有一个用例,我打算按键分组,同时在列上聚合。我正在使用Dataset,并试图通过使用groupBy和agg来实现这些操作。以下面的场景为例

case class Result(deptId:String,locations:Seq[String])
case class Department(deptId:String,location:String)

// using spark 2.0.2
// I have a Dataset `ds` of type Department   

+-------+--------------------+
|deptId |      location      |
+-------+--------------------+
|     d1|delhi               |            
|     d1|mumbai              |
|    dp2|calcutta            |
|    dp2|hyderabad           |       
+-------+--------------------+

我打算把它转换成

// Dataset `result` of type Result

+-------+--------------------+
|deptId |      locations     |
+-------+--------------------+
|     d1|[delhi,mumbai]      |            
|    dp2|[calcutta,hyderabad]|            
+-------+--------------------+

为此,我在stack上搜索并找到以下内容:

val flatten = udf(
  (xs: Seq[Seq[String]]) => xs.flatten)

val result = ds.groupBy("deptId").
                agg(flatten(collect_list("location")).as("locations")

以上对我来说似乎很整洁。

  1. 但在搜索上述内容之前,我首先搜索了数据集是否像RDD一样具有内置的reduceByKey。但找不到,所以选择了上面的。但我读了这篇文章《grouByKey vs reduceByKey》,才知道reduceByKey的洗牌次数更少,效率更高。我问这个问题的第一个原因是什么,我应该在我的场景中选择RDD吗

PS:如果我使用了一些错误的术语,请原谅。

共有1个答案

高宸
2023-03-14

要回答您的一些问题:

>

在Spark的数据集与DataFrame中,静态类型检查和性能之间有一个重要的权衡-Spark 2.0数据集与DataFrame

链接帖子特别建议不要使用UserDefinedAggregateFunction(而不是UserDefinedFunction),因为过度复制数据-引发以ArrayType为缓冲模式的UDAF性能问题

您甚至不需要用户定义函数,因为在您的情况下不需要展平:

val df = Seq[Department]().toDF

df.groupBy("deptId").agg(collect_list("location").as("locations"))

这就是你应该追求的。

静态类型的等价物是

val ds = Seq[Department]().toDS

ds
  .groupByKey(_.deptId)
  .mapGroups { case (deptId, xs) => Result(deptId, xs.map(_.location).toSeq) }

比数据帧选项贵得多。

 类似资料:
  • 大家好,首先,根据标题,有人可能会说这个问题已经得到了回答,但我的观点是比较特定于数据集和RDD API的RedueBykey、GroupBykey性能。我在许多帖子中看到,RedueBykey方法的性能比GroupByKey更有效,当然我同意这一点。尽管如此,我有点困惑,我无法弄清楚如果我们使用数据集或RDD,这些方法的行为如何。每种情况下应该使用哪一个? 我会尽量说得更具体一些,因此我会提供我

  • 我正在考虑将dataset1分解为每个“T”类型的多个记录,然后与DataSet2连接。但是你能给我一个更好的方法,如果数据集变大了,它不会影响性能吗?

  • 好吧,我对使用Scala/Spark还比较陌生,我想知道是否有一种设计模式可以在流媒体应用程序中使用大量数据帧(几个100k)? 在我的示例中,我有一个SparkStreaming应用程序,其消息负载类似于: 因此,当用户id为123的消息传入时,我需要使用特定于相关用户的SparkSQL拉入一些外部数据,并将其本地缓存,然后执行一些额外的计算,然后将新数据持久保存到数据库中。然后对流外传入的每条

  • 我正在尝试通过ID和日期聚合数据帧。假设我有一个DataFrame: 我想通过ID和日期(频率=1W)聚合该值,并得到一个dataframe如下所示: 我理解它可以通过迭代ID并使用grouper聚合价格来实现。有没有更有效的方法不迭代IDS?多谢。

  • 问题内容: 在为数据库(例如MySQL)设计模式时,会出现一个问题,即是否要完全规范化表。 一方面,联接(以及外键约束等)非常慢,另一方面,您会获得冗余数据和潜在的不一致情况。 这里“最优化”是正确的方法吗?即创建一个书本归一化数据库,然后查看可以进行归一化以实现最佳速度增益的内容。 对于这种方法,我的担心是,我将选择一个可能不够快的数据库设计- 但是在那个阶段重构模式(同时支持现有数据)将非常痛

  • 我试图弄清楚如何合并/连接两个数据帧,这样做的方式是:如果满足某一条件,R将两个数据帧中的两行合并成一行,但如果不满足该条件,R在原始数据帧中不存在的列中添加一个带有NAs的新行。我不清楚这是否是一个比我想象的更简单的连接,但我一直无法弄清楚如何做到这一点,甚至在阅读了一些堆栈溢出结果(例如或例如)之后。 下面是两个示例数据帧: null 在df3中创建一行,其中“name_df1”、“name_