当前位置: 首页 > 知识库问答 >
问题:

在两个流之间进行间隔连接时,似乎不会删除延迟事件

卜存
2023-03-14

我正在使用Flink 1.11,我有以下测试用例来尝试基于事件时间的间隔连接。

两个流的数据定义如下:

object JoinStockInterval {
  //the stocks data,
  //ts is the implicit method that converts the time string to timestamp
  val stocks = Seq(
    Stock("id1", "2020-09-16 20:50:15".ts, 1),
    Stock("id1", "2020-09-16 20:50:12".ts, 2),
    Stock("id1", "2020-09-16 20:50:18".ts, 4),
    Stock("id1", "2020-09-16 20:50:11".ts, 3),
    Stock("id1", "2020-09-16 20:50:11".ts, 10),
    Stock("id1", "2020-09-16 20:50:13".ts, 5),
    Stock("id1", "2020-09-16 20:50:20".ts, 6),
    Stock("id1", "2020-09-16 20:50:14".ts, 7),
    Stock("id1", "2020-09-16 20:50:22".ts, 8),
    Stock("id1", "2020-09-16 20:50:40".ts, 9),
    Stock("id1", "2020-09-16 20:50:15".ts, 100)
  )

  //Mock that the stock name is changing over time
  val stockNameChangings = Seq(
    StockNameChanging("id1", "Stock1", "2020-09-16 20:50:16".ts),
    StockNameChanging("id1", "Stock101", "2020-09-16 20:50:20".ts),
    StockNameChanging("id1", "Stock4", "2020-09-16 20:50:17".ts),
    StockNameChanging("id1", "Stock7", "2020-09-16 20:50:21".ts),
    StockNameChanging("id1", "Stock5", "2020-09-16 20:50:17".ts),
    StockNameChanging("id1", "Stock501", "2020-09-16 20:50:22".ts),
    StockNameChanging("id1", "Stock6", "2020-09-16 20:50:23".ts)

  )

}

测试用例定义如下,每个允许4秒延迟:

 test("test interval join inner 2 works") {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val ds1 = env.addSource(new IntervalJoinStockSource(emitInterval = 0)).assignTimestampsAndWatermarks(new StockWatermarkGenerator(4000)) //allow 4 secs lateness
    val ds2 = env.addSource(new IntervalJoinStockNameChangingSource(emitInterval = 0)).assignTimestampsAndWatermarks(new StockNameChangingWatermarkGenerator(4000)) //allow 4 secs lateness
    val tenv = StreamTableEnvironment.create(env)
    tenv.createTemporaryView("s1", ds1, $"id", $"price", $"trade_date".rowtime() as "rt1")
    tenv.createTemporaryView("s2", ds2, $"id", $"name", $"trade_date".rowtime() as "rt2")
    tenv.from("s1").printSchema()
    tenv.from("s2").printSchema()
    val sql =
      """
      select s1.id, s2.name, s1.price, cast (s1.rt1 as timestamp) as rt1, s2.rt2
      from s1 join s2
      on s1.id = s2.id
      where s1.rt1 between s2.rt2 - interval '2' second and s2.rt2 + interval '2' second

      """.stripMargin(' ')

    tenv.sqlQuery(sql).toAppendStream[Row].print()
    env.execute()
  }

连接结果如下:

id1,Stock1,1.0,2020-09-16T12:50:15,2020-09-16T12:50:16
id1,Stock1,4.0,2020-09-16T12:50:18,2020-09-16T12:50:16
id1,Stock101,4.0,2020-09-16T12:50:18,2020-09-16T12:50:20
id1,Stock4,4.0,2020-09-16T12:50:18,2020-09-16T12:50:17
id1,Stock4,1.0,2020-09-16T12:50:15,2020-09-16T12:50:17
id1,Stock5,4.0,2020-09-16T12:50:18,2020-09-16T12:50:17
id1,Stock5,1.0,2020-09-16T12:50:15,2020-09-16T12:50:17
id1,Stock101,6.0,2020-09-16T12:50:20,2020-09-16T12:50:20
id1,Stock7,6.0,2020-09-16T12:50:20,2020-09-16T12:50:21
id1,Stock501,6.0,2020-09-16T12:50:20,2020-09-16T12:50:22
id1,Stock1,7.0,2020-09-16T12:50:14,2020-09-16T12:50:16
id1,Stock101,8.0,2020-09-16T12:50:22,2020-09-16T12:50:20
id1,Stock501,8.0,2020-09-16T12:50:22,2020-09-16T12:50:22
id1,Stock7,8.0,2020-09-16T12:50:22,2020-09-16T12:50:21
id1,Stock6,8.0,2020-09-16T12:50:22,2020-09-16T12:50:23
id1,Stock1,100.0,2020-09-16T12:50:15,2020-09-16T12:50:16
id1,Stock4,100.0,2020-09-16T12:50:15,2020-09-16T12:50:17
id1,Stock5,100.0,2020-09-16T12:50:15,2020-09-16T12:50:17

奇怪的是,上面结果中的最后一条记录来自股票流中的股票(“id1”,“2020-09-16 20:50:15”.ts,100),但这条记录在股票流中出现较晚。看到股票流中的以下两条记录,我已经在这个问题上坚持了好几天,我想问为什么这个记录没有被放弃,而是成功地与另一条流(改名流)结合在一起

    Stock("id1", "2020-09-16 20:50:40".ts, 9),
    Stock("id1", "2020-09-16 20:50:15".ts, 100)

水印策略使用< code > assigner with punctuated watermark

共有1个答案

景英杰
2023-03-14

你想知道的唱片

Stock("id1", "2020-09-16 20:50:15".ts, 100)

从加入的角度来看不算晚。

原因在于,在一个操作符有多个输入的情况下(如区间连接),水印是如何传播的。join运算符处的当前水印总是迄今为止从所有输入通道接收到的最小水印。

因此,在联接处理完该记录之前

StockNameChanging("id1", "Stock501", "2020-09-16 20:50:22".ts)

连接处的水印由该记录确定

StockNameChanging("id1", "Stock7", "2020-09-16 20:50:21".ts)

因此水印仍然落在为连接定义的间隔内。

水印以这种方式工作,因为它们代表了一种断言,即流现在可以被认为是完整的,直到水印的时间戳。从连接的角度来看,它只对最远的流的水印有完整的知识。

 类似资料:
  • 问题内容: 是否有一个跟踪用户某些事件的表。 总是有一个动作,之后可能会有一个动作。 现在,我想查询这两个动作之间的时间差,以获取用户和之间的time_diff 。 现在,您可以假定没有多个条目(例如,至少一个,最大另一个)。 我想要这样的结果: 问题答案: 您可以使用以下查询: 该子句过滤掉仅包含一个动作的组,例如OP中的with记录。 演示在这里

  • 我的应用程序中有三个片段,其中需要传递和接收数据。我应该如何进行他们之间的沟通。我试图参考许多网站,但没有解决方案。 请给我推荐一些好的链接。 提前感谢。

  • 问题内容: 如何在Python中进行时间延迟? 问题答案: 这是另一个示例,其中某件事大约每分钟运行一次:

  • 我需要将两个数据集与CLOSE时间戳连接起来。第一个数据集是来自移动应用程序的日记数据集: 在这里: 第二个数据集是来自加速度计日志的数据集,显示移动(=INVH)或空闲(=NIVH): 在这里: 我需要根据时间戳字段之间的时间差连接两个数据帧。例如,在df1上留下join,以查看应用程序日志数据如何与实际加速度计日志一致。简单的左连接在这里不起作用,因为在大多数情况下有一个滞后时间。所以我的问题

  • 问题内容: 我想知道一段时间后如何调用函数。我已经尝试过time.sleep(),但是这会暂停整个脚本。我希望脚本继续进行,但是??? secs之后调用一个函数并同时运行其他脚本 问题答案: 看一看。它在新线程中运行您的函数。

  • 我经常看到两个参与者之间有很长的延迟(60+秒),从第一个参与者发送消息到第二个参与者,以及第二个参与者的方法随消息实际调用时。我可以寻找哪些类型的东西来调试这个问题? ActorA的每个实例都使用为ActorB发送一条消息。在ActorA中调用方法并在ActorB的开始处获得另一个时间戳之后,我立即收集了一个毫秒时间戳(使用)。这些时间戳之间的间隔一致为60秒或更长。具体地说,当按时间绘制时,该