当前位置: 首页 > 知识库问答 >
问题:

当源输入已耗尽时,Flink触发进程功能上的注册计时器

程智明
2023-03-14

我正在使用Flink流从包括文件在内的多个资源读取输入。我的目标是定期触发一些计算(流转时长),并在到达文件末尾时触发最终结果。我的处理子拓扑如下所示

myGenerator
   .generateData()
   .map(...)
   .keyBy(...)
   .process(new TriggerFunction(timeout));

我的生成器源之一可以是包含有界数据的文件

<代码>例如环境。readFile(inputFormat、filename、FileProcessingMode.PROCESS\u ONCE、interval、typeInfo)

因此,我想知道是否有一种机制可以捕获源操作符发送的事件,该事件表示已到达输入的末尾,并且预计不会有更多的事件发出拓扑拆卸的信号。

据我所知,当我的源到达输入端(文件)时,它标志着下游操作符的处理结束,因此我没有机会或方法触发通过udf处理函数中注册的定期计时器发出的最终结果。

我已尝试使用“FileProcessingMode”打开文件源。PROCESS\u CONTINUOUSLY“但这似乎更像是一种黑客行为,而不是一种解决方案。我还尝试使用带标点的水印使用水印策略,但我不确定如何从源中捕获\u输入的结尾并转发适当的水印(例如Long.MAX\u值)。

是否有办法确保即使源事件已耗尽,也能触发计时器?

共有1个答案

常子濯
2023-03-14

对于有界源,Flink将自动发送一个带有值的水印。消耗所有输入后的MAX\U水印。因此,为Long创建一个事件时间计时器就足够了。最大值

 类似资料:
  • 我在Azure上有一个函数,包含以下function.json文件: 除非我错了,否则这个函数应该每天运行一次,在凌晨3点? 这是函数的签名: 我到底做错了什么?当我手动触发(在门户中单击“运行”)时,该功能工作正常,但它在今天凌晨3点没有运行,昨天也没有运行。 编辑:所以,正如建议的那样,我已经将计划更改为付费计划,并且我选择了一个动态计划。日志仍然没有说明功能在今天早上3点被激活。

  • 本文向大家介绍React注册倒计时功能的实现,包括了React注册倒计时功能的实现的使用技巧和注意事项,需要的朋友参考一下 一、React版本 16.4.1 二、具体代码如下 设置state属性 2.倒计时 3.jsx代码 明明很简单的,但是看网上有的代码搞得很复杂一样,当然也可以用react相关插件,不过我觉得这样更简洁。 ps:react 获取服务器端时间倒计时 以上就是本文的全部内容,希望对

  • 我目前在Azure中托管了几十个网站,最近开始在每个web应用的门户刀片中看到“内存资源耗尽”警告: 我在两个S3标准(大型)应用程序服务计划中托管我的网站,我在所有网站上都会收到警告,无论它们在哪个应用程序服务计划上。 有趣的是,当查看任一应用服务计划的内存使用率时,我总是低于40%,内存使用率实际上相当一致。我从未看到峰值或任何接近85%内存使用率的东西。 我的问题是,我是否误解了警告消息?是

  • 我有一个听Kafka的Flink过程。然后,将消耗的消息保存在并发哈希映射中一段时间,然后需要将其放入cassandra。 操作员链类似于 在使用EventTime时,我有一些疑问 消息将具有唯一的id、时间戳和其他一些属性。一分钟内可能有一百万个唯一的钥匙。keyBy操作会影响性能吗 我需要涵盖以下场景 > ID为1的X消息在8小时1分1秒到达 ID为2的Y消息在8小时1分4秒到达 由于我使用I

  • 我正在我的GTX 1060 6gb上使用Python中的Tensorflow 1.2训练LSTM。 在每个时代,我用这种方法保存模型: 一切正常,但在九个时代之后,当我试图用这种方法保存模型时,我得到了ResourceExhaustedError。 我在培训期间检查了我的资源,但没有耗尽任何资源。 我得到的错误如下: 2017-06-29 12:43:02.865845: W tenstorflo

  • 我试图提取顶级URL并忽略路径。我使用的代码如下: 这个脚本已经运行了一个小时了。当我运行它时,它给出了以下警告: 如果有人能给我一个更快的建议,我将不胜感激,也许是“警告”建议的方法