当前位置: 首页 > 知识库问答 >
问题:

Apache Flink TwoInputStreamOP运算符的水印行为

东门航
2023-03-14

有2个指定了时间戳的数据流和定义如下的水印生成器。

val streamA: DataStream[A] = kafkaStreamASourceOutput.assignTimestampsAndWatermarks(
        WatermarkStrategy
          .forBoundedOutOfOrderness[A](Duration.ofSeconds(0))
          .withTimestampAssigner(new SerializableTimestampAssigner[A] {
            override def extractTimestamp(element: A, recordTimestamp: Long): Long = {
              element.lastUpdatedMs
            }
          })
      )

val streamA: DataStream[B] = kafkaStreamBSourceOutput.assignTimestampsAndWatermarks(
        WatermarkStrategy
          .forBoundedOutOfOrderness[B](Duration.ofSeconds(0))
          .withTimestampAssigner(new SerializableTimestampAssigner[B] {
            override def extractTimestamp(element: B, recordTimestamp: Long): Long = {
              element.lastUpdatedMs
            }
          })
      )

当这两个流在一个操作符中连接时,来自streamA或streamB的最小水印作为连接操作符的水印。

class CombineAB extends CoProcessFunction[A, B, C] {
   override def processElement1(elem: A, ctx:Context, out: Collector[C]) {
        out.collect(C(elem.x, elem.y, time.Now()))
   }
   override def processElement2(elem: B, ctx:Context, out: Collector[C]) {
        out.collect(C(elem.x, elem.y, time.Now()))
   }
}

val streamC: DataStream[C] = streamA.connect(streamB)
      .process(new CombineAB)

组合B运算符的水印是A或B中的最小值。基于C类元素是否标记为延迟。

但是,由于我们没有附加任何分配给C的时间戳,这是否意味着AssociineAB运算符中的任何元素都没有被标记为延迟?因此在C上窗口不会有任何延迟记录被删除?

假设我们将分配的时间戳和水印生成器附加到C,如下所示,那么这是否意味着来自A和B的水印被完全忽略,AssociineAB的水印仅取决于C的时间戳字段和C定义的延迟。

     streamC.assignTimestampsAndWatermarks(
        WatermarkStrategy
          .forBoundedOutOfOrderness[C](Duration.ofSeconds(0))
          .withTimestampAssigner(new SerializableTimestampAssigner[C] {
            override def extractTimestamp(element: C, recordTimestamp: Long): Long = {
              element.updatedTime
            }
          })
      )

是否有一种方法可以将时间戳赋值器附加到C,并且组合B的水印仍然是a和B的最小值,并且C的元素根据C的指定时间戳和组合B的wartermark标记得较晚

更新:CombineAB的优化实施

共有1个答案

龙嘉誉
2023-03-14

以下几点:

forBoundedOutOfOrness[A](Duration.of秒(0))是不寻常的。任何乱序事件都会迟到。为什么不使用forMonotonousTimestamps()

CombineAB产生的记录将有时间戳;无需对此流应用assignTimestampsAndWatermarks。收集器生成的任何记录的时间戳都是传入记录的时间戳。

如果在流C上调用assignTimestampsAndWatermarks,传入的水印将被过滤掉,您需要生成新的水印。

 类似资料:
  • Flink源函数引入水印,这些水印向下传递给下游操作符,根据这些操作符可以执行不同的基于时间的操作。对于使用多个流的操作员,将传入水印的最小值视为此时操作员的水印。 将源流拆分为多个逻辑流,然后将这些逻辑流传递给下游操作员(例如处理函数)。 Eg. 假设Process函数有4个子任务(例如),并且有100个关键组(假设),每个子任务处理25个关键组,即,等等。 如果从下午5点开始DriverStr

  • 我读到了四个Kinesis流的数据。每个流中的数据都是不同的数据类型。读取所有四个流后,我分配时间戳和水印,并聚合来自每个流的数据。四个聚合的结果都是使用相同的泛型对象输出的。我想合并四个流的结果,这样我就可以将合并后的流发送到一个ProcessFunction。这基本上允许我像使用CoProcessFunction一样使用ProcessFunction,但我可以处理来自两个以上流的数据(在本例中

  • 问题内容: 我经常使用Python的语句来显示数据。是的,我知道方法,方法和方法。我也知道splat运算符()可用于将Iterable扩展为函数参数。但是,我似乎无法通过声明来做到这一点。使用清单: 使用元组: 我想念什么吗?这根本不可能吗?接下来到底是什么?该文档说该关键字后面是逗号分隔的表达式列表,但是我想这与列表数据类型不同。我在SO和Web上做了很多挖掘工作,却没有找到明确的解释。 我正在

  • 在Flink中,我发现了2种设置水印的方法, 第一个是 第二个是 我想知道哪个最终会生效。

  • 问题内容: 我正在阅读有关ANY和ALL运算符的oracle文档。除了一件事,我非常了解它们的用途。它指出: 全部 : 如果子查询返回零行,则条件的计算结果为TRUE。 任何 : 如果子查询返回零行,则条件的计算结果为FALSE。 在我看来,这不是很合逻辑。为什么在一个空子查询上的ALL将返回TRUE,而ANY返回FALSE? 我是SQL的新手,所以我认为它会有一个用例来说明这种行为,这对我来说确

  • 问题内容: 为什么此代码有效? 为什么这会引发异常? 但是最奇怪的是,该代码也可以成功运行,没有任何异常: 看来Java的三元运算符会改变行为。有人可以解释为什么吗? 问题答案: 该行为在JLS- 条件运算符中 指定: 如果第二和第三个操作数中的一个是原始类型T的,并且其他的类型是施加装箱转换(§5.1.7)到T的结果,则 条件表达式的类型为T 。 强调我的。因此,在第二种情况下: 由于第三个操作