当前位置: 首页 > 知识库问答 >
问题:

flink的Kafka水印策略在我的应用程序中不起作用

艾俊晖
2023-03-14

我使用flink版本1.13.0

当我试图使用flink doc的Kafka水印策略时,这似乎不起作用,窗口处理功能将不会运行。

我想知道,在Kafka中,水印的时间戳将使用消费时间还是生产时间?

我的消费者代码如下:

val source = new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), properties)
  .setCommitOffsetsOnCheckpoints(true)
  .assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness[String](Duration.ofSeconds(5)))
stream = env.addSource(source)
  .name(nodeCfg.nodeName)
  .uid(nodeCfg.nodeName)
  .setParallelism(nodeCfg.workerCount)

并像这样使用窗口:

processStream
  .keyBy(_.num)
  .window(TumblingEventTimeWindows.of(Time.seconds(5)))
  .aggregate(new LatTimeAggregate(), new SignLatCalculateProcess())
  .name(nodeCfg.nodeName)
  .uid(nodeCfg.nodeName)
  .setParallelism(nodeCfg.workerCount)
  .addSink(new SignLatSink(serverConfig.smsRuleRedis))
  .name("lat_count_sink")
  .uid("lat_count_sink")

拓扑图是这样的:

共有1个答案

武功
2023-03-14

由于您没有在水印策略中指定时间戳分配器,因此您依赖FlinkKafkaConsumer为流记录分配时间戳。只有当从Kafka读取的记录的标题中有时间戳时,这才有效。否则,您将需要实现一个时间戳赋值器来从事件中提取时间戳。

请注意,除非您还实现了一个反序列化器,FlinkKafkaConsumer可以使用该反序列化器生成具有时间戳的对象,然后可以提取这些时间戳,否则您将无法实现FlinkKafkaConsumer可以应用的时间戳赋值器。否则,您可以选择在源之后的某个位置应用水印策略。

如果没有时间戳不是问题,那么还有其他问题。例如,您可能有一个空闲的Kafka分区,或者缺少足够远的事件来关闭窗口。

顺便说一下,如果您的事件在每个分区的基础上是有序的,并且如果您在FlinkKafkaConsumer上调用AssignTimestamps和Watermarks(您当前正在这样做),那么您可以使用FormononousTimestamps,而不是ForBoundedAutoforderness,这有一些显著的优点。

 类似资料:
  • 我是Flink的新手,所以在定义Flink中的水印时,我面临一些问题。 让我们从Kafka消费者开始。使用的反序列化是JSONKeyValueDeserializationSchema,因此没有自定义解析。 如果将接收器应用于此代码,则其工作正常。问题是需要水印来避免无序事件。这就是我写的策略: 在做了一些研究后,我最终得到了这段代码,但这不起作用。这些是我的问题: 在这里使用ObjectNode

  • 我们正在构建一个流处理管道,以使用Flink v1.11和事件时间特性来处理Kinesis消息。在定义源水印策略时,在官方留档中,我遇到了两个开箱即用的水印策略;forBoundedOutOfOrthy和forMonotonousTimestamps。但根据我对上述内容的理解,我认为这些不适合我的用法。以下是我的用法细节: 来自输入流的数据:(包含每分钟带有时间戳的数据) 现在,我想处理11:00

  • 我们正在构建一个流处理管道来处理/摄取Kafka消息。我们正在使用Flink v1.12.2。在定义源水印策略时,在官方留档中,我遇到了两种开箱即用的水印策略;forBoundedOutOfOrness和forMonotonousTimestamps。我确实浏览了javadoc,但并不完全理解何时以及为什么你应该使用一种策略而不是另一种策略。时间戳基于事件时间。谢谢。

  • 我不确定我的Flink应用程序是否需要水印。什么时候有必要? 如果我不需要它们,水印策略的目的是什么。noWatermarks()?

  • 我设置我的类,以便使用Laravel授权和策略功能。但是,在为我的方法定义中间件时,我一直遇到这个错误(类App\Policies\StatusPolicy不存在)。这就是我所拥有的: AuthServiceProvider。php ontroller.php 状态策略。php(由php artisan生成):策略状态策略--model=Status

  • 下面是我用raspberry PI的python(Thonny Idle)编写的代码。 请忽略Url,它不是真实地址。密码 错误 回溯(最近一次呼叫最后一次): 文件“/home/pi/Documents/PythonCode/TestingFirebase-1.py”,第17行,在 文件“/usr/local/lib/python3.7/dist-packages/firebase/decora