//x is the stream of (word, 1)
val x: DataStream[(String, Int)] = text
.flatMap(_.toLowerCase.split("\\W+"))
.map((_, 1))
//keyBy on the word field, what does the Tuple here mean in y
val y: KeyedStream[(String, Int), Tuple] = x.keyBy(0)
val z: DataStream[(String, Int)] = y.sum(1)
z.print
当您指定keyBy(0)时,您是通过流中元组的第一个元素来键控流,或者换句话说,您是通过字串来键控流。但是,编译器无法确定键是字符串,所以这个版本的keyBy总是将键视为包含某个对象(这是实际的键)的元组。
如果将keyBy重写为keyBy(_._1)
,那么编译器将能够推断密钥类型,并且y将是keyedstream[(String,Int),String]
,感觉应该更好。
对流进行键控的目的是对流进行分区,类似于SQL中groupBy将表拆分为不相交、不重叠的组。因此,在这种情况下,流(“A”,1),(B“,1),(C”,1),(A“,1),(C”,1),(C“,1)在逻辑上被分成三组:
("a",1), ("a",1)
("b",1)
("c",1), ("c",1), ("c",1)
然后,对每一个元组计算sum(1),其结果是通过将每一组中所有元组中的第二个字段相加来减少(在map/reduce意义下)每一个元组。因此,(“A”,1),(“A”,1)变成(“A”,2),以此类推。
与其使用z=y.sum(1)
,不如将其更完整地理解为
val z: DataStream[(String, Int)] = y.reduce(new ReduceFunction[(String, Int)] {
override def reduce(t1: (String, Int), t2: (String, Int)): (String, Int) =
(t1._1, t1._2 + t2._2)
})
如果运行代码,您可以准确地看到z是什么样子的。如果给它足够的资源,它可以在三个独立的线程中运行(因为有三个不同的键)。我刚才得到了这些结果:
3> (a,1)
2> (c,1)
1> (b,1)
2> (c,2)
2> (c,3)
3> (a,2)