当前位置: 首页 > 知识库问答 >
问题:

Apache Flink:水印不随广播流进行

乔俊才
2023-03-14

有1个高通量Kafka流定义如下

val stream: DataStream[A] = flinkEnv
  .addSource(kafkaStreamSource)
  .assignTimestampsAndWatermarks(
     WatermarkStrategy
        .forBoundedOutOfOrderness[A](Duration.ofSeconds(0))
        .withIdleness(Duration.ofSeconds(5))
        .withTimestampAssigner(new SerializableTimestampAssigner[A] {
            override def extractTimestamp(element: A, recordTimestamp: Long): Long = {
                element.lastUpdatedAt
              }
            }
        )
  )
  .window(TumblingEventTimeWindows.of(Time.minutes(1)))
  .reduce(.......)

上述窗口操作符的水印正确转发。

上述窗口操作符中的DataStream[A]需要使用一些保存在某些S3文件中的信息来丰富。S3文件很少更新。

S3文件作为流读取,然后广播以丰富DataStream[a]中的元素。

 val textInputFormat = new TextInputFormat(new Path("s3 path...."))
 val enrichWithElements: BroadcastStream[EnrichWithElement] = flinkEnv.readFile(textInputFormat, "s3 path ...", FileProcessingMode.PROCESS_CONTINUOUSLY, 30)
    .map(s3Element => {
      EnrichWithElement(.....)
    })
    .broadcast(new MapStateDescriptor......)

然后连接这两个流,用EnrichWithElement类型的元素来丰富A类型的所有元素。

class EnrichedAProcess
    extends BroadcastProcessFunction[A,EnrichWithElement,EnrichedAElement] {
  override def processElement(
      value: A,
      ctx: Context,
      out: Collector[EnrichedAElement]): Unit = {
      .....
      out.collect(EnrichedAElement(....))
  }

  override def processBroadcastElement(
      value: EnrichWithElement,
      ctx: Context,
      out: Collector[EnrichedAElement]): Unit = {
      .........
  }
}
stream
  .connect(enrichWithElements)
  .process(new EnrichedAProcess)

EnrichedAProcess有2个输入。其中之一是不断转发水印,但广播流没有任何时间信息或水印。这导致EnrichedAProcess的水印根本无法转发,因为它的一个输入没有传入水印。

是否有办法指定EnrichedProcess的水印仅依赖于非广播输入。

共有1个答案

凌伟泽
2023-03-14

操作员将多个输入信道设置其自己的水印,以从所有活动信道接收的最新水印的最小值。

您可以做的是将水印策略应用于始终返回MAX_WATERMARK作为其水印的广播流。(您不需要担心为该流分配时间戳。)

 类似资料:
  • 收听电台广播的流媒体直播,还可以录制广播。 作者说:有问题欢迎和我QQ信箱交流:10040142@qq.com [Code4App.com]

  • 当我这样做时,我得到了一个java.lang.ClassCastException:不能将Scala.some实例分配给org.apache.spark.accumulator实例中scala.option类型的字段org.apache.spark.accumulable.name与硬编码ArrayBuffer相同的代码工作得很好,所以我假设它与静态文件资源有关...有人知道我可能做错了什么吗?任

  • 问题内容: 我有一堆添加到的生产者线程和一个接收对象的工作线程。现在,我想扩展它,以使两个工作线程可以接收对象,但是对对象执行不同的工作。这是一个转折: 我希望 两个 接收线程都处理已放在队列中的对象。 如果我继续使用BlockingQueue,则两个线程将争用对象,只有一个工作线程将获取对象。 因此,我正在寻找类似于BlockingQueue的东西,但是具有广播行为。 应用程序:生产者线程实际上

  • 输出如下: 如果两个数组的维数不相同,则元素到元素的操作是不可能的。 然而,在 NumPy 中仍然可以对形状不相似的数组进行操作,因为它拥有广播功能。 较小的数组会广播到较大数组的大小,以便使它们的形状可兼容。 如果满足以下规则,可以进行广播: 如果输入在每个维度中的大小与输出大小匹配,或其值正好为 1,则在计算中可它。 如果上述规则产生有效结果,并且满足以下条件之一,那么数组被称为可广播的。 数

  • 原文:Broadcasting 另见:numpy.broadcast 术语广播描述了NumPy在算术运算时如何处理不同形状的数组。 在某些条件下,较小的数组“广播”成较大的数组以便有相同的形状。 广播提供了一种矢量化操作数组的方法,这样可以在C而不是Python中进行循环。 它可以在不制作不必要的数据副本的情况下实现这一点,并且通常可以实现高效 然而,有些情况下广播是一个坏主意,因为它会导致内存使

  • 在h5中用视频播放器DPlayer 监听竖屏和横屏(webfullscreen、fullscreen),更改水印范围,在竖屏的时候没问题,在横屏的时候看不到水印