当前位置: 首页 > 知识库问答 >
问题:

如何在 Flink 1.7.1 Session Windows 中使用窗口状态

农飞星
2023-03-14

我正在构建一个具有以下目标的 Flink 应用程序:

  1. 将事件收集到键控的非活动触发会话窗口中
  2. 尽早发出输入事件的副本,并通过会话引用进行增强
  3. 打开和关闭会话时发出会话更新以及收集的会话统计信息(在会话关闭时)

通过滚动窗口,我已经能够实现上述目标,但在会话窗口中我没有成功。

我的窗口处理代码如下

package io.github.streamingwithflink.jv

import java.util.{Calendar}

import io.github.streamingwithflink.util.{MySensorSource, SensorReading, SensorTimeAssigner}
import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.datastream.DataStreamSink
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.{TimeCharacteristic}
import org.apache.flink.streaming.api.scala.{DataStream, OutputTag, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.assigners.{EventTimeSessionWindows}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.{Trigger, TriggerResult}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

import scala.util.Random

object MySessionWindow {

  def main(args: Array[String]): Unit = {

    // set up the streaming execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // use event time for the application
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    // configure watermark interval
    env.getConfig.setAutoWatermarkInterval(1000L)

    // ingest sensor stream
    val sensorData: DataStream[SensorReading] = env
      // SensorSource generates random temperature readings
      .addSource(new MySensorSource)
      // assign timestamps and watermarks which are required for event time
      .assignTimestampsAndWatermarks(new SensorTimeAssigner)

    val sessionizedEvents = sensorData
      .keyBy(_.id)
      // a session window with 1.5 second gap
      .window(EventTimeSessionWindows.withGap(Time.milliseconds(1500)))
      // a custom trigger that fires every event received
      .trigger(new MyTrigger)
      // count readings per window
      .process(new MySessionWindowFunction)

    sessionizedEvents.print()

    // retrieve and print session output
    val sessionOutput: DataStreamSink[String] = sessionizedEvents
      .getSideOutput(new OutputTag[String]("session-status"))
      .print()

    env.execute()
  }
}

/** A trigger that fires with every event placed in window. */
class MyTrigger
    extends Trigger[SensorReading, TimeWindow] {

  override def onElement(
      r: SensorReading,
      timestamp: Long,
      window: TimeWindow,
      ctx: Trigger.TriggerContext): TriggerResult = {

    if (timestamp >= window.getEnd) {
      TriggerResult.FIRE_AND_PURGE
    }
    else  {
      TriggerResult.FIRE
    }
  }

  override def onEventTime(
                            timestamp: Long,
                            window: TimeWindow,
                            ctx: Trigger.TriggerContext): TriggerResult = {
    // Continue. not using event time timers
    TriggerResult.CONTINUE
  }

  override def onProcessingTime(
      timestamp: Long,
      window: TimeWindow,
      ctx: Trigger.TriggerContext): TriggerResult = {
    // Continue. We don't use processing time timers
    TriggerResult.CONTINUE
  }

  override def canMerge: Boolean = {
    return true
  }

  override def onMerge(
                        window: TimeWindow,
                        ctx: Trigger.OnMergeContext) = {
  }

  override def clear(
      window: TimeWindow,
      ctx: Trigger.TriggerContext): Unit = {
    // No trigger state to clear
  }
}

/** A window function that counts the readings per sensor and window.
  * The function emits the sensor id, session reference and temperature . */
class MySessionWindowFunction
  extends ProcessWindowFunction[SensorReading, (String, Int, Double), String, TimeWindow] {

  override def process(
      key: String,
      ctx: Context,
      readings: Iterable[SensorReading],
      out: Collector[(String, Int, Double)]): Unit = {

    // count readings
    val cnt = readings.count(_ => true)
    val curTime = Calendar.getInstance.getTimeInMillis
    val lastTime = readings.last.timestamp

    val sessionRefDesc = new ValueStateDescriptor[Int]("sessionRef", classOf[Int])
    val sessionRef = ctx.windowState.getState[Int](sessionRefDesc)
    val sessionCountDesc = new ValueStateDescriptor[Int]("sessionCount", classOf[Int])
    val sessionCount = ctx.windowState.getState[Int](sessionCountDesc)
    // Side output for session
    val sessionStatus: OutputTag[String] =
      new OutputTag[String]("session-status")
    // create a new sessionRef every time new window starts
    if (cnt == 1) {
      // set sessionRef for first element
      val sessionRefValue = new Random().nextInt(998) + 1
      sessionRef.update(sessionRefValue)
      ctx.output(sessionStatus, s"Session opened: ${readings.last.id}, ref:${sessionRef.value()}")
    }
    sessionCount.update(cnt)
    out.collect((readings.last.id, sessionRef.value(), readings.last.temperature))
  }

  override def clear(
                      ctx: Context): Unit = {
    // Clearing window session context
    val sessionRefDesc = new ValueStateDescriptor[Int]("sessionRef", classOf[Int])
    val sessionRef = ctx.windowState.getState[Int](sessionRefDesc)
    val sessionCountDesc = new ValueStateDescriptor[Int]("sessionCount", classOf[Int])
    val sessionCount = ctx.windowState.getState[Int](sessionCountDesc)
    // println(s"Clearing sessionRef ${sessionRef.value()}")
    // Side output for session
    val sessionOutput: OutputTag[String] =
      new OutputTag[String]("session-status")
    ctx.output(sessionOutput, s"Session closed: ref:${sessionRef.value()}, count:${sessionCount.value()}")
    sessionRef.clear()
    sessionCount.clear()
    super.clear(ctx)
  }
}

来生成我正在使用的输入

package io.github.streamingwithflink.util

import java.util.Calendar

import org.apache.flink.streaming.api.functions.source.RichSourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext

import scala.util.Random

/**
  * Flink SourceFunction to generate SensorReadings with random temperature values.
  *
  * Each parallel instance of the source simulates 1 sensor which emit one sensor
  * reading spaced by a progressive delay capped at 3 seconds (1,2,3,1,2,3,1...)
  */
class MySensorSource extends RichSourceFunction[SensorReading] {

  // flag indicating whether source is still running.
  var running: Boolean = true

  /** run() continuously emits SensorReadings by emitting them through the SourceContext. */
  override def run(srcCtx: SourceContext[SensorReading]): Unit = {

    // initialize random number generator
    val rand = new Random()
    // look up index of this parallel task
    val taskIdx = this.getRuntimeContext.getIndexOfThisSubtask

    // initialize sensor ids and temperatures
    var curFTemp = (1 to 1).map {  // Slow
      i => ("sensor_" + (taskIdx * 10 + i), 65 + (rand.nextGaussian() * 20));
    }

//    curFTemp.foreach(t => System.out.println(t._1, t._2))

    // emit data until being canceled
    var waitTime = 0;
    while (running) {
      // Progressive 1s delay, with 3s max: 1,2,3,1,2,3,1...
      waitTime = (waitTime) % 3000 + 1000
      // update temperature
      curFTemp = curFTemp.map( t => (t._1, t._2 + rand.nextGaussian() * 0.5) )
      // get current time
      val curTime = Calendar.getInstance.getTimeInMillis

      // emit new SensorReading
      curFTemp.foreach({t => srcCtx.collect(SensorReading(t._1, curTime, t._2))})
//      curFTemp.foreach(t => println(s"TX: id:${t._1}, ts:${curTime}, temp:${t._2}"))
      Thread.sleep(waitTime)
    }
  }

  /** Cancels this SourceFunction. */
  override def cancel(): Unit = {
    running = false
  }

}

case class SensorReading(id: String, timestamp: Long, temperature: Double)

当我使用会话窗口执行时,会出现以下异常:

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
    at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:647)
    at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1510)
    at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:645)
    at io.github.streamingwithflink.jv.MySessionWindow$.main(MySessionWindow.scala:55)
    at io.github.streamingwithflink.jv.MySessionWindow.main(MySessionWindow.scala)
Caused by: java.lang.UnsupportedOperationException: Per-window state is not allowed when using merging windows.
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$MergingWindowStateStore.getState(WindowOperator.java:678)
    at io.github.streamingwithflink.jv.MySessionWindowFunction.process(MySessionWindow.scala:126)
    at io.github.streamingwithflink.jv.MySessionWindowFunction.process(MySessionWindow.scala:111)
    at org.apache.flink.streaming.api.scala.function.util.ScalaProcessWindowFunctionWrapper.process(ScalaProcessWindowFunctionWrapper.scala:63)
    at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:50)
    at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:546)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:370)
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
    at java.lang.Thread.run(Thread.java:748)

希望我错过了一个技巧,因为无法使用会话窗口存储状态感觉非常严格。

任何指针将不胜感激。

共有1个答案

归浩博
2023-03-14

会话窗口确实相当特殊。当每个新事件到达时,它首先被分配到自己的窗口,然后处理所有当前会话窗口的集合,并执行任何可能的合并(基于会话间隙)。这种方法意味着,对于给定事件所属的会话,实际上没有一个稳定的概念,并且它使得每窗口状态的概念相当尴尬——而且不受支持。

您可能能够使用基于 globalState 的会话窗口或使用 ProcessFunction 而不是窗口 API 来生成解决方案。

 类似资料:
  • 问题内容: 我正在尝试将某些Windows函数(和)用于数据框,但我不知道如何使用它们。 有人可以帮我吗?在Python API文档 中,没有关于它的示例。 具体来说,我正在尝试获取数据框中数字字段的分位数。 我正在使用Spark 1.4.0。 问题答案: 要使用窗口功能,您必须先创建一个窗口。定义与普通SQL几乎相同,这意味着您可以定义顺序,分区或同时定义两者。首先让我们创建一些虚拟数据: 确保

  • main.java--(src/sample文件夹) studentcontroller.java--(src/sample/controller文件夹) studentdao.java和sexdao.java(数据访问对象)--(src/sample/model文件夹) Student.java(公共类学生和构造器)--(src/sample/model文件夹) oddbc的util下的dbut

  • 我正在通过glfw3库学习OpenGL,我正在使用imgui制作一个有趣的应用程序。 我想做一个小引擎或CAD工具。 问题是我不能在imgui窗口中呈现我想要的。 我搜索了谷歌,找到了以下链接: https://gamedev.stackexchange.com/questions/150214/render-in-a-imgui-window https://gamedev.stackexcha

  • 问题内容: 我在Xcode中创建了一个空项目。 当我按下运行按钮时,它显示一个窗口。如何更改其外观,例如窗口的透明度等。 我已经搜索了很多,但是每个人都使用变量来像这里一样进行更改,但是如何创建实例? 我是Mac应用程序开发的新手。有人可以写一个详细的答案吗? 谢谢! 问题答案: 您可以按如下方式获取应用程序窗口的实例: 从NSApp获取窗口(共享应用程序实例的全局常量): 或覆盖或访问视图的wi

  • 我可否问一问作出决定的理由是什么?我是否可以推断,如果数据的到达非常不规则(50%进入定义的窗口长度,而其他50%没有),窗口方法的结果更有偏差(因为50%的事件被丢弃)? 另一方面,在使用状态时,我们是否花费更多的时间检查和更新状态?

  • 问题内容: 我刚开始使用PyQt5。我一直在尝试完成一个看似非常简单的任务,但没有获得足够的信息。经过大量的谷歌搜索,我已经能够关闭一个窗口,并在另一个UI加载时启动另一个窗口,但这不是我要在这里做的。 我想在同一窗口中切换UI。我正在将UI文件作为全局变量加载到我的python文件中,其中每个UI有2个类。当我单击一个UI中的特定按钮时,我想切换到同一窗口中的另一个UI。下面是代码示例: 问题答