Scala aggregate

殷耀
2023-12-01

1.Spark函数讲解:aggregate
2.Example of the Scala aggregate function

1.Spark函数讲解:aggregate

函数原型:

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U

官方文档定义:

Aggregate the elements of each partitions, and then the results for all the partitions, using given  combine functions and a neutral "zero value". This function can return a different result type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U and one operation for merging two U's, as in scala. TraversableOnce. Both of these functions are allowed to modify and return their first argument instead of creating a new U to avoid memory allocation.

对每个partition得到的结果,和一个zero value带入给定的combine函数。本函数可以返回一个不同的类型,U,而不是这个RDD的类型,T。因此,我们需要一个操作来把T转成U,还有一个操作把两个U合并,成为scala.TraversableOnce。这两个函数都允许调整并返回它们的第一个参数,而不是创造一个新的U,从而避免分配内存。

实例 1:

def seqOP(a:Int, b:Int):Int = {
    println("seqOp: " + a + "\t" +b )
    math.min(a,b)
}
//seqOP : (a:Int, b:Int)Int

def combOP(a:Int, b:Int):Int = {
    println("combOP: " + a + "\t" + b)
    a + b
}
//combOP:(a:Int,b:Int)Int

val z = sc.parallelize(List(1,2,3,4,5,6),2)
z.aggregate(3)(seqOP, combOP)
//output
seqOp:3     1  //partition 1:  seqOP(3,1)=>1
seqOp:3     4  //partition 2:seqOP(3,4)=>3
seqOp:1     2  //partition 1:seqOP(1,2)=>1
seqOp:3     5  //partition 2:seqOP(3,5)=>3
seqOp:1     3  //partition 1:seqOP(1,3)=>1
seqOp:3     6  //partition 2:seqOP(3,6)=>3
combOp:3    1 //combOP(3,1)=>4,3:zero value, 1:partiton 1's output
combOp:4    3  //combOP(4,3)=>7,3:partition2's output

//final output:7

实例 2:

val z = sc. parallelize ( List (1 ,2 ,3 ,4 ,5 ,6),3)
val zz = z. aggregate(3)(seqOP, combOp)
seqOp: 3    3
seqOp: 3    4
seqOp: 3    5
seqOp: 3    6
seqOp: 3    1
seqOp: 1    2

combOp: 3   3  //combOp(3:zero value,3:partition 1's output)=>6
combOp: 6   3  //combOp(6,3:partition 2's output)=>6
combOp: 9   1  //combOp(9,1:partition3's output)=>10
10

实例 3:

def seqOp(a:String, b:String) : String = {
println("seqOp: " + a + "\t" + b)
math.min(a.length , b.length ).toString
}
//seqOp: (a: String, b: String)String

def combOp(a:String, b:String) : String = {
println("combOp: " + a + "\t" + b)
a + b
}
//combOp: (a: String, b: String)String

val z = sc. parallelize ( List ("12" ,"23" ,"345" ,"4567") ,2)
z. aggregate ("")(seqOp, combOp)
seqOp:  345  //partition 1: ("","345")=>"0"
seqOp:  12  //partition 2:("","12")=>"0"
seqOp: 0    4567//partition 1:("0","4567")=>"1"
seqOp: 0    23 //partition 2:("0","23")=>"1"
combOp:     1 //combOp("","1")=>"1"
combOp: 1   1 //combOp("1","1")=>"11"
//r    es25: String = 11

注意:

1.reduce函数和combine函数必须满足交换律(commutative)和结合律(associative)
2.从aggregate函数的定义可知,combine函数的输出类型必须和输入的类型一致。

2.Example of the Scala aggregate function

Let’s see if some ASCII art doesn’t help. Consider the type signature of aggregate:

def aggregate [B](z:B)(seqop:(B,A)=>B,combop:(B.B)=>B):B

the aggregate might work like this:
z   A   z   A   z   A   z   A
 \ /     \ /seqop\ /     \ /    
  B       B       B       B
    \   /  combop   \   /
      B _           _ B
         \ combop  /
              B


Now I have a GenSeq(“This”,”is”,”an”,”example”), and I want to know how many characters there are in it. I can write the following:

import scala.collection.GenSeq
val seq = GenSeq("This","is","an","example")
val chars = seq.aggregate(0)(_ + _.length, _ + _)

So, first it would compute this:

0 + "This".length    //4
0 + "is".length      //2
0 + "an".length      //2
0 + "example".length //7

What it does next cannot be predicted ( there are more than one way of combining the result ), but it might do this(like in the ascii art above):

4 + 2 // 6
2 + 7 // 9

At which it concludes with
6 + 9 // 15
which gives the final result. Now, this is a bit similar in structure to foldLeft, but it has an additional function (B,B)=>B, which fold doesn’t have. This function, however, enables it to work in parallel.

Consider, for example, that each of the four computations initial computations are independent of each other, and can be done in parallel. The next two (resulting in 6 and 9) can be started once their computations on which they depend are finished, but these two can also run in parallel.

The 7 computitions, parallelized above, could take as little as the same time 3 serial computations.
上面的7次计算可以并行化为3次计算。

Actually, with such a small collection the cost in synchronizing computation would be big enough to wipe out any gains. Furthermore, if you folded this, it would only take 4 computations total. Once your collections get larger, however, you start to see some real gains.

Consider, on the other hand, foldLeft. Because it doesn’t have the additional function, it cannot parallelize any computation:

(((0 + "This.length) + "is".length) + "an".length) + "example".length

Each of the inner parenthesis must be computed before the outer one can proceed.

 类似资料:

相关阅读

相关文章

相关问答