val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val inputStream = env.readTextFile("dataset.txt")
.map(transformStream(_))
.assignAscendingTimestamps(_.eventTime)
.keyBy(_.id)
.timeWindow(Time.seconds(windowSize),Time.seconds(slidingStep))
def transformStream(input: String): EventStream = {...}
case class EventStream(val eventTime: Long, val id: String, actualEvent: String)
我想做的是对每个窗口批处理的每个分区应用一个通用函数,也许应用一个复杂的处理算法或类似的东西。我已经在DataStream API中看到了该方法的应用,但我不理解它是如何工作的。在Flink API中,它说它的用法与Scala中的用法相同:
inputStream.apply { WindowFunction }
有人能解释一下apply方法是做什么的或者是如何使用的吗?Scala中的示例会更好。apply方法是否符合我的要求?
所以基本上有两个可能的方向,根据你想要做的计算类型。可以使用:fold
/reduce
/aggrege
或更多您已经提到的泛型-apply
。所有这些应用于windows的密钥。
对于apply
,它是应用计算的一种非常通用的方法。最基本的版本(在Scala中)是:
def apply[R: TypeInformation](function: (K, W, Iterable[T],Collector[R]) => Unit): DataStream[R]
其中函数采用4个参数:
val stream: DataStream[(String,Int)] = ...
stream.keyBy(_._1)
.window(EventTimeSessionWindows.withGap(Time.seconds(conf.sessionGap())))
.apply((e1, e2) => (e1._1, e1._2 + e2._2),
(key, window, in, out: Collector[(String, Long, Long, Int)]) => {
out.collect((key, window.getStart, window.getEnd, in.map(_._2).sum))
})
有人尝试在Apache Flink中使用DynamoDB流吗? Flink有一个Kinesis消费者。但是我正在寻找如何直接使用Dynamo流。 我试了很多次,但什么也没找到。然而,Flink Jira董事会发现一个未决请求。所以我想这个选项还不可用?我有什么选择? 允许FlinkKinesisConsumer适应AWS DynamoDB流
如果我想在Flink中分裂一个流,那么最好的方法是什么?
在Flink中,我有一个键控流,我正在对它应用一个进程函数。 我的键选择器看起来像这样... FooBarProcessFunction看起来像这样... 在FooBarProcessFunction中,我希望获得由MyKeySelector的getKey方法创建的键。那可行吗? 目前,我正在使用一种变通方法,其中我基本上在processElement函数中重新创建键。但如果我能避免这样做,那就太
我正在编写一个Flink流程序,其中我需要使用一些静态数据集(信息库,IB)来丰富用户事件的数据流。 对于例如。假设我们有一个买家的静态数据集,并且我们有一个事件的clickstream,对于每个事件,我们要添加一个布尔标志,指示事件的实施者是否是买家。 另一个选择可以是使用托管操作员状态来存储购买者设置,但是我如何保持按用户id分配的该状态,以避免在单个事件查找中使用网络I/O呢?在内存状态后端