我正在构建一个具有以下目标的 Flink 应用程序:
通过滚动窗口,我已经能够实现上述目标,但在会话窗口中我没有成功。
我的窗口处理代码如下
package io.github.streamingwithflink.jv
import java.util.{Calendar}
import io.github.streamingwithflink.util.{MySensorSource, SensorReading, SensorTimeAssigner}
import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.datastream.DataStreamSink
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.{TimeCharacteristic}
import org.apache.flink.streaming.api.scala.{DataStream, OutputTag, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.assigners.{EventTimeSessionWindows}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.{Trigger, TriggerResult}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import scala.util.Random
object MySessionWindow {
def main(args: Array[String]): Unit = {
// set up the streaming execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
// use event time for the application
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// configure watermark interval
env.getConfig.setAutoWatermarkInterval(1000L)
// ingest sensor stream
val sensorData: DataStream[SensorReading] = env
// SensorSource generates random temperature readings
.addSource(new MySensorSource)
// assign timestamps and watermarks which are required for event time
.assignTimestampsAndWatermarks(new SensorTimeAssigner)
val sessionizedEvents = sensorData
.keyBy(_.id)
// a session window with 1.5 second gap
.window(EventTimeSessionWindows.withGap(Time.milliseconds(1500)))
// a custom trigger that fires every event received
.trigger(new MyTrigger)
// count readings per window
.process(new MySessionWindowFunction)
sessionizedEvents.print()
// retrieve and print session output
val sessionOutput: DataStreamSink[String] = sessionizedEvents
.getSideOutput(new OutputTag[String]("session-status"))
.print()
env.execute()
}
}
/** A trigger that fires with every event placed in window. */
class MyTrigger
extends Trigger[SensorReading, TimeWindow] {
override def onElement(
r: SensorReading,
timestamp: Long,
window: TimeWindow,
ctx: Trigger.TriggerContext): TriggerResult = {
if (timestamp >= window.getEnd) {
TriggerResult.FIRE_AND_PURGE
}
else {
TriggerResult.FIRE
}
}
override def onEventTime(
timestamp: Long,
window: TimeWindow,
ctx: Trigger.TriggerContext): TriggerResult = {
// Continue. not using event time timers
TriggerResult.CONTINUE
}
override def onProcessingTime(
timestamp: Long,
window: TimeWindow,
ctx: Trigger.TriggerContext): TriggerResult = {
// Continue. We don't use processing time timers
TriggerResult.CONTINUE
}
override def canMerge: Boolean = {
return true
}
override def onMerge(
window: TimeWindow,
ctx: Trigger.OnMergeContext) = {
}
override def clear(
window: TimeWindow,
ctx: Trigger.TriggerContext): Unit = {
// No trigger state to clear
}
}
/** A window function that counts the readings per sensor and window.
* The function emits the sensor id, session reference and temperature . */
class MySessionWindowFunction
extends ProcessWindowFunction[SensorReading, (String, Int, Double), String, TimeWindow] {
override def process(
key: String,
ctx: Context,
readings: Iterable[SensorReading],
out: Collector[(String, Int, Double)]): Unit = {
// count readings
val cnt = readings.count(_ => true)
val curTime = Calendar.getInstance.getTimeInMillis
val lastTime = readings.last.timestamp
val sessionRefDesc = new ValueStateDescriptor[Int]("sessionRef", classOf[Int])
val sessionRef = ctx.windowState.getState[Int](sessionRefDesc)
val sessionCountDesc = new ValueStateDescriptor[Int]("sessionCount", classOf[Int])
val sessionCount = ctx.windowState.getState[Int](sessionCountDesc)
// Side output for session
val sessionStatus: OutputTag[String] =
new OutputTag[String]("session-status")
// create a new sessionRef every time new window starts
if (cnt == 1) {
// set sessionRef for first element
val sessionRefValue = new Random().nextInt(998) + 1
sessionRef.update(sessionRefValue)
ctx.output(sessionStatus, s"Session opened: ${readings.last.id}, ref:${sessionRef.value()}")
}
sessionCount.update(cnt)
out.collect((readings.last.id, sessionRef.value(), readings.last.temperature))
}
override def clear(
ctx: Context): Unit = {
// Clearing window session context
val sessionRefDesc = new ValueStateDescriptor[Int]("sessionRef", classOf[Int])
val sessionRef = ctx.windowState.getState[Int](sessionRefDesc)
val sessionCountDesc = new ValueStateDescriptor[Int]("sessionCount", classOf[Int])
val sessionCount = ctx.windowState.getState[Int](sessionCountDesc)
// println(s"Clearing sessionRef ${sessionRef.value()}")
// Side output for session
val sessionOutput: OutputTag[String] =
new OutputTag[String]("session-status")
ctx.output(sessionOutput, s"Session closed: ref:${sessionRef.value()}, count:${sessionCount.value()}")
sessionRef.clear()
sessionCount.clear()
super.clear(ctx)
}
}
来生成我正在使用的输入
package io.github.streamingwithflink.util
import java.util.Calendar
import org.apache.flink.streaming.api.functions.source.RichSourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import scala.util.Random
/**
* Flink SourceFunction to generate SensorReadings with random temperature values.
*
* Each parallel instance of the source simulates 1 sensor which emit one sensor
* reading spaced by a progressive delay capped at 3 seconds (1,2,3,1,2,3,1...)
*/
class MySensorSource extends RichSourceFunction[SensorReading] {
// flag indicating whether source is still running.
var running: Boolean = true
/** run() continuously emits SensorReadings by emitting them through the SourceContext. */
override def run(srcCtx: SourceContext[SensorReading]): Unit = {
// initialize random number generator
val rand = new Random()
// look up index of this parallel task
val taskIdx = this.getRuntimeContext.getIndexOfThisSubtask
// initialize sensor ids and temperatures
var curFTemp = (1 to 1).map { // Slow
i => ("sensor_" + (taskIdx * 10 + i), 65 + (rand.nextGaussian() * 20));
}
// curFTemp.foreach(t => System.out.println(t._1, t._2))
// emit data until being canceled
var waitTime = 0;
while (running) {
// Progressive 1s delay, with 3s max: 1,2,3,1,2,3,1...
waitTime = (waitTime) % 3000 + 1000
// update temperature
curFTemp = curFTemp.map( t => (t._1, t._2 + rand.nextGaussian() * 0.5) )
// get current time
val curTime = Calendar.getInstance.getTimeInMillis
// emit new SensorReading
curFTemp.foreach({t => srcCtx.collect(SensorReading(t._1, curTime, t._2))})
// curFTemp.foreach(t => println(s"TX: id:${t._1}, ts:${curTime}, temp:${t._2}"))
Thread.sleep(waitTime)
}
}
/** Cancels this SourceFunction. */
override def cancel(): Unit = {
running = false
}
}
和
case class SensorReading(id: String, timestamp: Long, temperature: Double)
当我使用会话窗口执行时,会出现以下异常:
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:647)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1510)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:645)
at io.github.streamingwithflink.jv.MySessionWindow$.main(MySessionWindow.scala:55)
at io.github.streamingwithflink.jv.MySessionWindow.main(MySessionWindow.scala)
Caused by: java.lang.UnsupportedOperationException: Per-window state is not allowed when using merging windows.
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$MergingWindowStateStore.getState(WindowOperator.java:678)
at io.github.streamingwithflink.jv.MySessionWindowFunction.process(MySessionWindow.scala:126)
at io.github.streamingwithflink.jv.MySessionWindowFunction.process(MySessionWindow.scala:111)
at org.apache.flink.streaming.api.scala.function.util.ScalaProcessWindowFunctionWrapper.process(ScalaProcessWindowFunctionWrapper.scala:63)
at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:50)
at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:546)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:370)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)
希望我错过了一个技巧,因为无法使用会话窗口存储状态感觉非常严格。
任何指针将不胜感激。
会话窗口确实相当特殊。当每个新事件到达时,它首先被分配到自己的窗口,然后处理所有当前会话窗口的集合,并执行任何可能的合并(基于会话间隙)。这种方法意味着,对于给定事件所属的会话,实际上没有一个稳定的概念,并且它使得每窗口状态的概念相当尴尬——而且不受支持。
您可能能够使用基于 globalState
的会话窗口或使用 ProcessFunction
而不是窗口 API 来生成解决方案。
问题内容: 我正在尝试将某些Windows函数(和)用于数据框,但我不知道如何使用它们。 有人可以帮我吗?在Python API文档 中,没有关于它的示例。 具体来说,我正在尝试获取数据框中数字字段的分位数。 我正在使用Spark 1.4.0。 问题答案: 要使用窗口功能,您必须先创建一个窗口。定义与普通SQL几乎相同,这意味着您可以定义顺序,分区或同时定义两者。首先让我们创建一些虚拟数据: 确保
main.java--(src/sample文件夹) studentcontroller.java--(src/sample/controller文件夹) studentdao.java和sexdao.java(数据访问对象)--(src/sample/model文件夹) Student.java(公共类学生和构造器)--(src/sample/model文件夹) oddbc的util下的dbut
我正在通过glfw3库学习OpenGL,我正在使用imgui制作一个有趣的应用程序。 我想做一个小引擎或CAD工具。 问题是我不能在imgui窗口中呈现我想要的。 我搜索了谷歌,找到了以下链接: https://gamedev.stackexchange.com/questions/150214/render-in-a-imgui-window https://gamedev.stackexcha
问题内容: 我在Xcode中创建了一个空项目。 当我按下运行按钮时,它显示一个窗口。如何更改其外观,例如窗口的透明度等。 我已经搜索了很多,但是每个人都使用变量来像这里一样进行更改,但是如何创建实例? 我是Mac应用程序开发的新手。有人可以写一个详细的答案吗? 谢谢! 问题答案: 您可以按如下方式获取应用程序窗口的实例: 从NSApp获取窗口(共享应用程序实例的全局常量): 或覆盖或访问视图的wi
我可否问一问作出决定的理由是什么?我是否可以推断,如果数据的到达非常不规则(50%进入定义的窗口长度,而其他50%没有),窗口方法的结果更有偏差(因为50%的事件被丢弃)? 另一方面,在使用状态时,我们是否花费更多的时间检查和更新状态?
问题内容: 我刚开始使用PyQt5。我一直在尝试完成一个看似非常简单的任务,但没有获得足够的信息。经过大量的谷歌搜索,我已经能够关闭一个窗口,并在另一个UI加载时启动另一个窗口,但这不是我要在这里做的。 我想在同一窗口中切换UI。我正在将UI文件作为全局变量加载到我的python文件中,其中每个UI有2个类。当我单击一个UI中的特定按钮时,我想切换到同一窗口中的另一个UI。下面是代码示例: 问题答