当前位置: 首页 > 知识库问答 >
问题:

Apache Flink:如何将自定义逻辑应用于后期事件?

万坚壁
2023-03-14

尽管Flink有一些内置的工具来处理延迟数据,比如允许延迟,但我想自己处理延迟数据。例如,我想监控延迟事件或将它们保存到数据库中。

我该怎么做?

共有2个答案

艾嘉石
2023-03-14

ProcessFunctions(ProcessFunctionKeyedProcessFunction等)通过Context对象提供对记录的事件时间戳和TimerService的访问。TimerService提供对当前水印的访问。

您可以通过比较事件时间戳和水印来识别延迟记录。如果时间戳小于或等于水印,则事件延迟。

如何处理迟到的事件取决于您自己。您可以标记它们,可以丢弃它们,通过侧面输出发射它们,或者使用它们执行任何类型的计算。

尉迟卓
2023-03-14

通常在窗口操作符中使用延迟和水印。如果您使用的是window操作符,则可以像这样使用sideoutput:

val windowStream = eventStream.keyBy(output => output.rule)
  .window(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.MINUTES)))
  .sideOutputLateData(lateOutputTag)

并从侧输出中获取后期元素,如下所示:

windowStream.getSideOutput(lateOutputTag).print()
 类似资料:
  • 问题内容: 我尝试实现JPA自定义存储库。 我有一个这样的过滤器对象: 从前端,我根据用户输入创建FilterPatient的实例。 因此,例如,用户可以值姓和cf属性或值和姓,等等。 我想实现一个自定义存储库,如下所示: 题: 根据用户输入,我必须执行其他查询,那么如何管理存储库?我必须编写查询方法以涵盖输入字段的不同组合,并且必须在服务中编写有关方法存储库调用的逻辑?或者我可以更好地自定义方法

  • 我尝试实现一个JPA自定义存储库。 我有一个过滤器对象,如下所示: 在前端,我根据用户输入创建了一个FilterPatient实例。 按用户分类的示例查询 和其他可能的配置,例如,在FilterPatient的cf属性为NULL中,查询将变为:

  • 我正在连接Azure SQL数据库,下一个任务是在连接失败时创建自定义重试逻辑。我希望重试逻辑在启动时(如果需要)以及应用程序运行时出现连接故障时都能运行。我做了一个测试,从我的应用程序中删除了IP限制,然后导致我的应用程序出现异常(例外)。我想处理引发异常的时间,以便触发一个作业,验证应用程序和服务器是否正确配置。我正在寻找一个解决方案,在那里我可以处理这些异常并重试DB事务? 数据源配置 应用

  • 常规的Spring-Cloud-Stream函数如下所示(摘自文档): 考虑到不使用反应式方法,我想知道是否可以基于自定义逻辑进行不同的转换和/或将结果发送到不同的“输出”绑定?类似这样: 此外,这里我们有相同的返回类型(字符串),但可能需要从每个分支返回不同类的对象(通过使用对象作为整个函数的返回类型)。 我可以用而不是想象一个解决方案,在其中我们进行不同的调用,但也许可以用做同样的事情?

  • 在这一章中,您将学习如何添加JavaScript逻辑到你的nativescript APP,你会使用构建 NativeScript 框架的基本模式, MVVM ,或是“ Model View ViewModel ”。下面是这些词的意思: Model(模型):模型定义和表示数据。将模型从各种可能使用的视图中分离出来,就可以实现代码重用。 View(视图):视图代表UI,在 NativeScript

  • 我刚刚读了Hystrix指南,正试图把我的头缠绕在默认断路器和恢复周期是如何操作的,然后是如何定制它们的行为。 显然,如果电路跳闸,Hystrix会自动调用该命令的方法;这一点我很理解。但是,首先要根据什么标准来使电路跳闸呢?理想情况下,我想尝试多次击打支持服务(例如,最多3次),然后我们才会认为该服务是脱机/不健康的,并跳闸断路器。我该如何实现?在哪里实现? null 所以我猜我的下一个问题部分