当前位置: 首页 > 知识库问答 >
问题:

Spark聚合函数-aggregateByKey是如何工作的?

程祺
2023-03-14
**row   | id,  c.**
---------------
row1  | k1 , c1  
row2  | k1 , c2  
row3  | k1 , c3  
row4  | k2 , c4  
row5  | k2 , c5  
row6  | k2 , c6  
row7  | k3 , c7  
row8  | k3 , c8  
row9  | k3 , c9  
row10 | k4 , c10   
row11 | k4 , c11  
row12 | k4 , c12 
    null

共有1个答案

东方华荣
2023-03-14

aggregateByKey()与ReduceByKey有很大不同。发生的情况是reduceByKey是AggregateByKey的一种特殊情况。

aggregateByKey()将组合特定键的值,这种组合的结果可以是您指定的任何对象。您必须指定如何在一个分区(在同一节点中执行)内组合(“Added”)值,以及如何组合来自不同分区(可能在不同节点中)的结果。reduceByKey是一个特例,因为组合的结果(例如和)与值的类型相同,从不同分区组合的操作也与在分区内组合值的操作相同。

举个例子:假设你有一个对的列表。你把它并行化:

val pairs = sc.parallelize(Array(("a", 3), ("a", 1), ("b", 7), ("a", 5)))
val resReduce = pairs.reduceByKey(_ + _) //the same operation for everything
resReduce.collect
res3: Array[(String, Int)] = Array((b,7), (a,9))

//0 is initial value, _+_ inside partition, _+_ between partitions
val resAgg = pairs.aggregateByKey(0)(_+_,_+_)
resAgg.collect
res4: Array[(String, Int)] = Array((b,7), (a,9))
import scala.collection.mutable.HashSet
//the initial value is a void Set. Adding an element to a set is the first
//_+_ Join two sets is the  _++_
val sets = pairs.aggregateByKey(new HashSet[Int])(_+_, _++_)
sets.collect
res5: Array[(String, scala.collection.mutable.HashSet[Int])]  =Array((b,Set(7)), (a,Set(1, 5, 3)))
 类似资料:
  • 我需要聚合一个基于1分钟时间间隔的数据集。当我尝试此操作时,它会抛出错误: 我的数据集如下所示 org.apache.spark.sql.AnalysisException:无法解析(datetime,value)中的列名“60秒”;在org.apache.spark.sql.dataset$$anonfun$resolve$1.apply(dataset.scala:216)在org.apach

  • 一、简单聚合 1.1 数据准备 // 需要导入 spark sql 内置的函数包 import org.apache.spark.sql.functions._ val spark = SparkSession.builder().appName("aggregations").master("local[2]").getOrCreate() val empDF = spark.read.jso

  • 问题内容: 我只是在学习MySQL-是否有组合(或嵌套)聚合函数的方法? 给定一个查询: 这将给我每个用户回答的问题数量。我真正想要的是每个用户回答的平均问题数量…… 计算此统计信息的正确方法是什么? 如果有可能,是否有办法针对每个问题分解此统计信息?(用户可以多次回答相同的问题)。就像是: 问题答案: 您必须使用子查询: 您不能将一个聚合与另一个聚合一起包装。如果MySQL支持分析/排序/窗口功

  • 在Spark中是如何工作的? 如果我们注册一个对象作为一个表,会将所有数据保存在内存中吗?

  • 我有一个配置单元表,只有很少的bigint或string列,超过3800万行,总大小略大于1GB,测试环境是一个小型独立集群,有4个工作节点,每个都有8GB内存,Spark 1.4。在Spark sql shell中,我尝试执行一个sql 有好几次,工作总是停留在第一阶段,几乎没有任务悬而未决。 GC报告似乎表明没有足够的内存来存储临时对象,进程正在等待完整的GC完成。 一个挂起节点的GC输出:

  • 我正在考虑将dataset1分解为每个“T”类型的多个记录,然后与DataSet2连接。但是你能给我一个更好的方法,如果数据集变大了,它不会影响性能吗?