当前位置: 首页 > 知识库问答 >
问题:

理解flink保存点

邢和光
2023-03-14

考虑使用以下管道的Apache Flink流媒体应用程序

Kafka-Source -> flatMap 1 -> flatMap 2 -> flatMap 3 -> Kafka-Sink

其中每个flatMap函数都是非状态运算符(例如数据流的正常.flatMap函数)。

检查点/保存点如何工作,以防传入消息在flatMap 3处挂起?从flatMap 1开始重新启动后,是否会重新处理该消息,还是会跳到flatMap 3

我有点困惑,因为文档中似乎提到应用程序状态是我可以在有状态运算符中使用的,但我的应用程序中没有有状态运算符。是否保存“处理进度”

这和失败有区别(-

我试着通过放置线程来找到自己(使用一次启用检查点和rocksdb后端)。sleep()flatMap 3中,然后使用保存点取消作业。然而,这导致flink命令行工具一直挂起,直到睡眠结束,甚至在作业取消之前执行flatMap 3,甚至发送到接收器。因此,我似乎无法手动强迫这种情况来分析Flink的行为。

如果如上所述,检查点/保存点未保存/覆盖“处理进度”,我如何确保在重启/故障情况下,任何给定的操作员(flatmap 1/2/3)都不会重新处理到达管道的每条消息?


共有1个答案

汝承载
2023-03-14

执行检查点时,每个任务(操作符的并行实例)都检查其状态。在您的示例中,三个flatmap操作符是无状态的,因此没有要检查的状态。Kafka源是有状态的,检查所有分区的读取偏移量。

如果出现故障,则恢复作业并加载所有任务的状态,这意味着在源操作符的情况下,将重置读取偏移。因此,应用程序将重新处理自上次检查点以来的所有事件。

为了实现端到端完全一次,您需要一个特殊的接收器连接器,它提供事务支持(例如,对于Kafka)或支持幂等写入。

 类似资料:
  • 下面是我对Flink的疑问。 我们可以将检查点和保存点存储到RockDB等外部数据结构中吗?还是只是我们可以存储在RockDB等中的状态。 状态后端是否会影响检查点?如果是,以何种方式? 什么是状态处理器API?它与我们存储的保存点和检查点直接相关吗?状态处理器API提供了普通保存点无法提供的哪些额外好处? 对于3个问题,请尽可能描述性地回答。我对学习状态处理器API很感兴趣,但我想深入了解它的应

  • 如果Flink应用程序在发生故障或更新后正在启动备份,那么不明确属于KeyedState或OperatorState的类变量是否会持久化? 例如,Flink的留档中描述的BoundedOutOfOrdernessGenerator有一个电流最大时间戳变量。如果更新了Flink应用程序,电流最大时间戳中的值是否会丢失,或者是否会写入在应用程序更新之前创建的保存点? 这样做的真正原因是我想实现一个自定

  • 我们目前正在kubernetes上运行flink,作为使用这个helm模板的作业集群:https://github.com/docker-flink/examples/tree/master/helm/flink(带有一些添加的配置)。 如果我想关闭集群,重新部署新映像(由于应用程序代码更新)并重新启动,我将如何从保存点进行恢复? jobManager命令严格设置在standalone-job.s

  • 版本flink 1.7 我正在尝试从保存点(或检查点)还原flink作业,该作业所做的是读取kafka的内容- 我使用rocksdb和启用的检查点。 现在我尝试手动触发一个保存点。每个聚合的预期值为30(1个数据/每分钟)。但是当我从保存点(flink run-d-s{url})恢复时,聚合值不是30(小于30,取决于我取消flink作业和恢复的时间)。当作业正常运行时,它得到30。 我不知道为什

  • 我想使用Rocksdb状态后端在Flink中保持大约2TB的状态。我将使用增量检查点,因此它将大大减少检查点时间。 但我有时不得不更改代码,例如重新缩放、错误修复、添加新的过滤器/映射、添加新的源/汇等。 所有这些都会影响作业拓扑。当状态发生任何变化时,我可以再次引导状态。但其他时候,引导状态可能很困难,因为这意味着我浪费时间。 在这种情况下,我必须采取一个保存点来重新开始我的工作。当作业运行时,

  • 在javadoc中有一个类模式中的代码示例,我不理解它的概念。 编译方法是静态的,并保存到类模式的对象中。这是如何工作的? 方法匹配器从实例p的对象调用,并存储到类型匹配器的变量中。这是如何工作的?