我正在学习Flink,其中一件令我困惑的事情是使用一个名为Collector的对象。例如在平面图函数中。这个Collector和它的方法收集了什么?以及为什么例如map函数不需要通过显式使用它来传递结果?
这里可以看到在flatmap函数中使用收集器的一些示例:https://www.programcreek.com/scala/org.apache.flink.util.Collector
此外,如果我搜索收集器在Flink体系结构中的位置,我找不到任何具有该映射的图
如您所知,如果您希望一个工件在数据流中生成N个输出,您可以使用收集器将输出数据封装在flatmap中,相反,Map通常生成一对一的数据,因此不需要使用它。从某种意义上说,收集器具有广泛的内部应用程序。你可以看看org。阿帕奇。Flink。流式处理。api。操作员。输出(从收集器扩展)\r组织。阿帕奇。Flink。运行时。操作员。运输。OutputCollector,通常用于收集记录并将其发送给writer。等等,收集需要写入数据时调用。
示例(不一定准确):
flatMap的Scala源代码有三种定义。让我们看看第一个的定义。
/**
* Creates a new DataStream by applying the given function to every element and flattening
* the results.
*/
def flatMap[R: TypeInformation](fun: (T, Collector[R]) => Unit): DataStream[R] = {
if (fun == null) {
throw new NullPointerException("FlatMap function must not be null.")
}
val cleanFun = clean(fun)
val flatMapper = new FlatMapFunction[T, R] {
def flatMap(in: T, out: Collector[R]) { cleanFun(in, out) }
}
flatMap(flatMapper)
}
使用此方法的示例如下:
text.flatMap((input: String, out: Collector[String]) => {
input.split(" ").foreach(out.collect)
})
在这种方法中,我们需要通过收集器手动发送数据
然后让我们看看源代码中的第二个定义:
/**
* Creates a new DataStream by applying the given function to every element and flattening
* the results.
*/
def flatMap[R: TypeInformation](fun: T => TraversableOnce[R]): DataStream[R] = {
if (fun == null) {
throw new NullPointerException("FlatMap function must not be null.")
}
val cleanFun = clean(fun)
val flatMapper = new FlatMapFunction[T, R] {
def flatMap(in: T, out: Collector[R]) { cleanFun(in) foreach out.collect }
}
flatMap(flatMapper)
}
这里我们不使用Collector来收集输出,而是直接输出一个列表,Flink帮助我们将列表展平。使用Traversableonce也会导致我们无论如何都要返回一个列表,即使它是一个空列表,否则我们无法匹配函数的定义。
text.flatMap(input => {
if (input.size > 15) {
input.split(" ")
} else {
Seq.empty
}
})
你可以找到很多类似的地方,只要是关于发送数据记录的,你几乎可以看到Collector。
Flink将Collector传递给任何可能发出任意数量流元素的用户函数。map函数不使用Collector,因为它执行一对一的转换,map函数的返回值是输出。而平面图可以为每个事件发出零、一个或多个流元素,这使得Collector成为适应这一点的方便方式。
问题内容: 我是Java新手,对Java中的垃圾收集器感到困惑。它实际上是做什么的,什么时候生效。请描述Java中垃圾收集器的一些属性。 问题答案: 该垃圾收集器是运行在一个程序的Java虚拟机,其摆脱其未使用的Java应用程序了对象。它是自动内存管理的一种形式。 当典型的Java应用程序运行时,它正在创建新的对象,例如和,但是在一段时间之后,这些对象将不再使用。例如,看下面的代码: 在上面的代码
问题内容: 我对中的功能有些困惑。例如,为什么下面的试验(一致地)执行它们的工作? 我找不到关于此的好的文档。 问题答案: 伪随机数生成器通过对值执行某些运算来工作。通常,此值是生成器生成的先前编号。但是,第一次使用生成器时,没有先前的值。 播种伪随机数生成器会为其提供第一个“上一个”值。每个种子值将对应于给定随机数生成器的一系列生成值。也就是说,如果两次提供相同的种子,则两次获得相同的数字序列。
我一直在努力学习什么是EJB bean,这意味着他们的实例在池中被管理,等等。真的不能很好地掌握它们。 你能给我解释一下它们到底是什么吗(实际上对于一个Java程序员来说)?他们是做什么的?他们的目的是什么?为什么要真正使用它们?(为什么不坚持?)也许是一个示例应用程序? 请仅参考更新的信息,即。关于EJB的过时信息可能具有误导性。 对于EJB学习初学者,请注意: EJB基于分布式对象,这是指运行
我很难理解流,以workcount为例,对于像Kafka这样的无限源,“sum”到底是做什么的? 我有点理解有时间窗的情况,因为它有开始和结束时间,对我来说就像一个“批次”,但如果没有时间窗, 什么是开始时间和结束时间
对数据库进行查询和修改操作的语言叫做 SQL(Structured Query Language,结构化查询语言)。SQL 语言是目前广泛使用的关系数据库标准语言,是各种数据库交互方式的基础。 著名的大型商用数据库 Oracle、DB2、Sybase、SQL Server,开源的数据库 PostgreSQL、MySQL,甚至一些小型的数据库 Access 等都支持 SQL。近些年蓬勃发展的 NoS
本文向大家介绍你有了解Rxjs是什么吗?它是做什么的?相关面试题,主要包含被问及你有了解Rxjs是什么吗?它是做什么的?时的应答技巧和注意事项,需要的朋友参考一下 RxJS 是一个库,它通过使用 observable 序列来编写异步和基于事件的程序。它提供了一个核心类型 Observable,附属类型 (Observer、 Schedulers、 Subjects) 和受 [Array#extra