当前位置: 首页 > 知识库问答 >
问题:

Flink 流:从一个窗口,在另一个窗口中查找状态

禄奇希
2023-03-14

我有两条流:

  • 测量
  • WhoMeasured(关于谁进行了测量的元数据)

这些是它们的案例类:

case class Measurement(var value: Int, var who_measured_id: Int)
case class WhoMeasured(var who_measured_id: Int, var name: String)

测量流包含大量数据。WhoMeasured流几乎没有任何可用性。事实上,对于<code>who_measured_id</code>流中的每个<code>who_。这本质上是一个哈希表,由WhoMeasured流填充。

在我的自定义窗口函数中

class WFunc extends WindowFunction[Measurement, Long, Int, TimeWindow] {
  override def apply(key: Int, window: TimeWindow, input: Iterable[Measurement], out: Collector[Long]): Unit = {

    // Here I need access to the WhoMeasured stream to get the name of the person who took a measurement
    // The following two are equivalent since I keyed by who_measured_id
    val name_who_measured = magic(key)
    val name_who_measured = magic(input.head.who_measured_id)
  }
}

这是我的工作。现在你可能会看到,有一些东西不见了:两个流的结合。

val who_measured_stream = who_measured_source
  .keyBy(w => w.who_measured_id)
  .countWindow(1)

val measurement_stream = measurements_source
  .keyBy(m => m.who_measured_id)
  .timeWindow(Time.seconds(60), Time.seconds(5))
  .apply(new WFunc)

因此,从本质上讲,这是一种查找表,当Who量测流中的新元素到达时会被更新。

所以问题是:如何实现从一个窗口流到另一个窗口流的查找?

跟进:

按照 Fabian 建议的方式实现后,作业总是会因某种序列化问题而失败:

[info] Loading project definition from /home/jgroeger/Code/MeasurementJob/project
[info] Set current project to MeasurementJob (in build file:/home/jgroeger/Code/MeasurementJob/)
[info] Compiling 8 Scala sources to /home/jgroeger/Code/MeasurementJob/target/scala-2.11/classes...
[info] Running de.company.project.Main dev MeasurementJob
[error] Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: The implementation of the RichCoFlatMapFunction is not serializable. The object probably contains or references non serializable fields.
[error]     at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100)
[error]     at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1478)
[error]     at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:161)
[error]     at org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:230)
[error]     at org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:127)
[error]     at de.company.project.jobs.MeasurementJob.run(MeasurementJob.scala:139)
[error]     at de.company.project.Main$.main(Main.scala:55)
[error]     at de.company.project.Main.main(Main.scala)
[error] Caused by: java.io.NotSerializableException: de.company.project.jobs.MeasurementJob
[error]     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
[error]     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
[error]     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
[error]     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
[error]     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
[error]     at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
[error]     at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:301)
[error]     at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
[error]     ... 7 more
java.lang.RuntimeException: Nonzero exit code returned from runner: 1
    at scala.sys.package$.error(package.scala:27)
[trace] Stack trace suppressed: run last MeasurementJob/compile:run for the full output.
[error] (MeasurementJob/compile:run) Nonzero exit code returned from runner: 1
[error] Total time: 9 s, completed Nov 15, 2016 2:28:46 PM

Process finished with exit code 1

错误消息:

The implementation of the RichCoFlatMapFunction is not serializable. The object probably contains or references non serializable fields.

但是,我的 JoiningCoFlatMap 唯一的字段是建议的 ValueState

签名如下所示:

class JoiningCoFlatMap extends RichCoFlatMapFunction[Measurement, WhoMeasured, (Measurement, String)] {

共有1个答案

颜祖鹤
2023-03-14

我想你要做的是一个窗口操作,然后是一个连接。

您可以使用有状态的< code>CoFlatMapFunction来实现高容量流和低值按键更新流的联接,如下例所示:

val measures: DataStream[Measurement] = ???
val who: DataStream[WhoMeasured] = ???

val agg: DataStream[(Int, Long)] = measures
  .keyBy(_._2) // measured_by_id
  .timeWindow(Time.seconds(60), Time.seconds(5))
  .apply( (id: Int, w: TimeWindow, v: Iterable[(Int, Int, String)], out: Collector[(Int, Long)]) => {
    // do your aggregation
  })

val joined: DataStream[(Int, Long, String)] = agg
  .keyBy(_._1) // measured_by_id
  .connect(who.keyBy(_.who_measured_id))
  .flatMap(new JoiningCoFlatMap)

// CoFlatMapFunction
class JoiningCoFlatMap extends RichCoFlatMapFunction[(Int, Long), WhoMeasured, (Int, Long, String)] {

  var names: ValueState[String] = null

  override def open(conf: Configuration): Unit = {
    val stateDescrptr = new ValueStateDescriptor[String](
      "whoMeasuredName",
      classOf[String],
      ""                 // default value
    )
    names = getRuntimeContext.getState(stateDescrptr)
  }

  override def flatMap1(a: (Int, Long), out: Collector[(Int, Long, String)]): Unit = {
    // join with state
    out.collect( (a._1, a._2, names.value()) )
  }

  override def flatMap2(w: WhoMeasured, out: Collector[(Int, Long, String)]): Unit = {
    // update state
    names.update(w.name)
  }
}

关于实现的说明:< code>CoFlatMapFunction不能决定处理哪个输入,即< code>flatmap1和< code>flatmap2函数的调用取决于到达操作符的数据。它不能由函数控制。这是初始化状态时的一个问题。开始时,状态可能没有到达的< code>Measurement对象的正确名称,但会返回默认值。您可以通过缓冲测量值并连接它们一次来避免这种情况,来自< code>who流的键的第一个更新到达。为此你需要另一个州。

 类似资料:
  • 目前,我已经开始使用Selenium2.0/Web-Driver为我工作的公司进行自动化测试。 目前我已经开发了大约20个测试,但是当我运行这些测试时,它们会为每个测试打开一个新的浏览器窗口。 我在注册测试用例中运行它,然后 在第二个测试用例中运行,我认为应该将焦点放回第一个窗口。 我还使用关闭正在创建的其他窗口,但我希望它们一开始就不打开。

  • 长话短说,我搜索了与我类似的问题,所以我可以“自己”解决这个问题。我在这里和这里找到了这些例子,但没有一个对我有用。 我需要将在第一个窗口的TextField中键入的字符串传递给下一个窗口。 我的主要: 当我使用FXML时,这是Window1(java1)的控制器: 以及Window2的控制器(java2): 问题指向我用自定义函数(j2.setLbText)设置Text的Java1Control

  • 我有两个流,希望将第二个流连接到窗口内的第一个流,因为我需要对与会话相关的两个流的连接进行一些计算(流的连接控制会话)。 实际上,当从留档读取时,(会话)窗口只允许在单个流上进行计算,而不允许在连接中进行计算。 我曾尝试使用会话窗口和协处理器函数的组合,但结果并不完全符合我的预期。 有没有办法合并Flink中与会话窗口相关的两个流?

  • 我需要一些帮助。我在最后写下了我的问题,并将首先解释我的代码到底应该做什么: 我正在制作一个与其他程序进行通信的程序。我需要我的软件能够点击另一个程序上的按钮,我相信这样做的合适代码是: 具有 但是,我不知道如何获取hWnd,它是同时运行的另一个程序的特定窗口上的特定按钮的句柄。我在某个地方读到,我可能会做以下事情: 哪里: 但是,我对FindWindowEX()函数有一些问题。 问题1:我看到的

  • 问题内容: 我是一个学习敏捷的初学者。在这里的第一篇文章。 额外信息,但可能不必要。 在我创建的此应用中,用户在主View控制器上选择一个图像,并将其传递给第二个View控制器。在那里,图像被分解成碎片,并将这些碎片放置在单独的UIImageViews中。目的是通过交换图像以正确的顺序放置它们。 一切正常,并且在模拟器中运行正常。但是,我试图添加基本动画(移动UIImageViews),但是它们没

  • 问题内容: 我已经制作了一个简单的JavaFX应用程序,但是当用户单击按钮时,我希望主窗口打开一个辅助窗口。 最简单的方法是什么? 问题答案: