当前位置: 首页 > 知识库问答 >
问题:

从kafka主题使用空闲分区时,水印生成在本地/独立模式下的工作方式有所不同吗?

澹台臻
2023-03-14

我正在考虑这个水印:

class MyWatermarker(val maxTimeLag: Long = 0)
    extends AssignerWithPeriodicWatermarks[MyEvent] {
  var maxTs: Long = 0

  override def extractTimestamp(e: MyEvent, previousElementTimestamp: Long): Long = {
    val timestamp = e.timestamp
    maxTs = maxTs.max(timestamp)
    timestamp
  }

  override def getCurrentWatermark: WatermarkOld = {
    println(s"event watermark: ${maxTs - maxTimeLag}")
    new WatermarkOld(maxTs - maxTimeLag)
  }

基础事件来自 kafka 源,然后传递给流程函数。实现与问题无关,我将只分享相关的位:

  override def processElement(
    event: MyEvent,
    ctx: KeyedProcessFunction[String, MyEvent, MyEvent]#Context,
    out: Collector[StreamEvent]
  ): Unit = {
    println(
      s"In process function, got event: $event, ctx.timestamp: ${ctx.timestamp}, currentWatermark: ${ctx.timerService.currentWatermark}"
    )
  ...
  }

当我在一个真实的kubernetes集群上运行这个应用程序时,使用了一个具有空闲分区的kafka源主题,水印如预期的那样保持为0:

In process function, got event: xxx, ctx.timestamp: 1601475710619, currentWatermark: 0

我还可以看到水印中生成的这些日志:

event watermark: 1601475710619
event watermark: 0
event watermark: 1601475710619
event watermark: 0

有趣的是,当我在IntelliJ上本地运行同一个应用程序,并且同一主题也有空闲的kafka分区时,我还从水印中获得了上述日志,水印在0和最近接收到的元素的ts之间振荡,因为maxLag=0。然而,对于我来说,出乎意料的是,来自process函数的日志显示水印仍在前进:

In process function, got event: xxx, ctx.timestamp: 1601475710619, currentWatermark: 1601475710618

为什么会发生这种情况?仅供参考,我使用的是 Flink 1.10,环境并行度设置为 2,并且在这两种情况下都有事件时间语义。

共有1个答案

田博远
2023-03-14

如果要使用每个分区的水印,可以直接在Flink Kafka消费者[1]上调用assignTimestampsAndWatermarks,那么我相信空闲分区将始终保留整个水印。

使用每个分区水印,每个Kafka源任务将分别将水印应用到它正在处理的每个分区,然后发出每个分区水印中最小的水印作为其水印。因此,至少有一个Kafka源任务的水印为0,假设您在水印之后和进程函数之前有一个keyBy,这将在进程函数处保留水印。

否则,如果将水印应用于 Kafka 源任务的输出,则水印任务的水印是否为 0 取决于其相应的 Kafka 源任务是否具有任何非空闲分区。如果将分区分配给实例不是确定性的,这可以解释为什么您在IntelliJ中看到不同的结果。

请注意,在Flink 1.11 [2]中,对空闲资源的处理已经重新进行了,与此相关的错误修复仍然悬而未决[3]。

[1]https://ci . Apache . org/projects/flink/flink-docs-release-1.10/dev/connectors/Kafka . html # Kafka-consumers-and-timestamp-extraction watermark-emission。< br >[2]https://ci . Apache . org/projects/flink/flink-docs-release-1.11/dev/event _ timestamps _ watermark s . html #处理-idle-sources。[3]https://issues.apache.org/jira/browse/FLINK-18934

 类似资料:
  • 当spark作业中有需要的jar文件时,需要通过2种方式将其添加到spark作业中: 1。命令中的选项。 2。。 有人能告诉我这两种方式之间的区别吗? 从这个问题来看,答案是它们是相同的,只是优先级不同,但我认为这不是真的。如果我在yarn集群模式下提交spark作业,那么根据官方站点,如果命令中的选项中没有包含jar文件,那么addJar()将不起作用。 如果您将sparkcontext.add

  • 我有一个夸克斯Kafka消费者。在 VM 模式下,它工作正常。 在我用:。/mvnw包-可选 当我在纯模式下运行它时,我有这个例外:

  • 我有@KafkaListener使用topicPattern与正则表达式,工作正常(foo。*),但现在我想将侦听器分配给所有匹配主题的所有分区。 https://docs.spring.io/spring-kafka/docs/2.6.1/reference/html/#tip-assign-all-parts并没有真正帮助我,因为我不知道主题名称。

  • 我试图详细学习JavaEE7,但我在从数据库中获取记录并在JSF页面上显示它们方面遇到了问题。 我使用Wildfly10.1.0和Oracle XE11。我创建了以下数据源: 在JBoss的管理接口中的连接测试是成功的。 这是我的: 当我通过运行WildFly,并通过部署我的应用程序时,它可以工作。 当我在Eclipse中启动服务器并尝试使用相同的命令部署应用程序时,它失败了--因为我添加了JPA

  • 如何确定spark独立群集模式上的工作线程数?在独立群集模式下添加工作线程时,持续时间将缩短。 例如,对于我的输入数据3.5 G,WordCount需要3.8分钟。但是,在我添加了一个内存为4 G的工作器后,需要2.6分钟。 增加调谐火花的工人可以吗?我正在考虑这方面的风险。 我的环境设置如下:, 内存128克,16个CPU,用于9个虚拟机 输入数据信息 HDFS中的3.5 G数据文件

  • 在启动hbase时(在独立模式下),使用/bin/start hbase。在hbase目录中, 我得到以下错误, 如何解决这个错误? 我的hbase网站。xml文件如下所示: 在运行hbase时,我得到了错误... 在hbase-env.sh,我有 如何解决我的错误启动hbase??