当前位置: 首页 > 知识库问答 >
问题:

Flink Kinesis Consumer未存储上次成功处理的序列号

濮泳
2023-03-14

我们正在使用Flink Kinesis消费者将Kinesis流中的数据消费到我们的Flink应用程序中。

KCL库使用DynamoDB表存储上次成功处理的Kinesis流序列号。这样,下次应用程序启动时,它将从停止的位置恢复。

但是,Flink Kinesis消费者似乎在任何持久性存储中都不维护任何这样的序列号。因此,我们需要依赖ShardIteratortype(trim\u horizen、latest等)来决定在应用程序重新启动时恢复Flink应用程序处理的位置。

一个可能的解决方案是依赖Flink检查点机制,但这仅在应用程序失败后恢复时有效,而不是在应用程序被故意取消并且需要从上次成功使用的Kinesis流序列号重新启动时有效。

我们需要自己存储这些最后成功消耗的序列号吗?

共有2个答案

通俊发
2023-03-14

为了补充David的回答,我想解释一下不存储序列号背后的原因。

提交到源系统中的任何类型的偏移量都会将检查点/保存点功能限制在容错范围内。也就是说,只有最新的检查点/保存点才能恢复。

然而,Flink实际上支持跳回以前的检查点/保存点。考虑升级应用程序。您可以在升级之前创建一个保存点,并让它运行几分钟,在那里创建几个检查点。然后,您会发现一个关键的bug。您希望回滚到已获取的保存点并放弃所有检查点。

现在,如果Flink只将源偏移提交给源系统,那么从现在起到恢复的保存点之间,我们将无法重播数据。因此,正如David指出的那样,Flink需要将偏移量存储在保存点本身中。此时,额外提交到源系统并不会带来任何好处,而且在恢复到以前的保存点/检查点时会造成混乱。

您认为额外存储偏移有什么好处吗?

王骏
2023-03-14

Flink的最佳实践是使用检查点和保存点,因为这些检查点和保存点会创建一致的快照,其中包含消息队列中的偏移量(在本例中,是运动流序列号),以及作业图其余部分中的所有状态,这些状态是由于消耗了这些偏移量之前的数据。这使恢复或重新启动而不丢失或重复数据成为可能。

Flink的检查点是Flink为从故障中恢复而自动拍摄的快照,其格式针对快速恢复进行了优化。保存点使用相同的底层快照机制,但手动触发,其格式更关注操作灵活性而非性能

保存点是您需要的。特别是,使用保存点取消和从保存点恢复非常有用。

另一个选项是将保留的检查点与ExternalizedCheckpointCleanup一起使用。RETAIN\u ON\u取消。

 类似资料:
  • 在web.xml中,我有以下内容

  • 问题内容: 我正在创建一个返回Dictionary的,但是当我在另一个类中调用此方法时,其值为nil。 然后,我尝试在另一个类中调用它,以便可以查看Dictionary 的值并基于该值显示数据。例如, 这里的问题是,当我调用函数时,我在尝试调用字典时收到一个值。 简而言之,我的问题是如何为完成处理程序分配值并在Xcode项目中的其他位置调用它? 问题答案: 您的问题很乱。您的示例代码无效,并且没有

  • 有没有一种方法可以添加一个登录成功处理程序使用spall-oaut2? 我尝试使用基本身份验证过滤器,但它只过滤客户端凭据,而不是用户凭据。 还是需要创建自定义用户身份验证管理器? 蒂亚

  • 本文向大家介绍HTML5 / JS存储事件处理程序,包括了HTML5 / JS存储事件处理程序的使用技巧和注意事项,需要的朋友参考一下 仅当存储事件由另一个窗口触发时,才会触发存储事件处理程序。您可以尝试运行以下代码:

  • 有没有办法用Google App Engine(Java)在Google Cloud Storage中创建一个带有签名URL的处理程序? 我可以成功地上传文件到buckets与签名的URL在我的Android(Java)应用程序,但我没有目前的方式执行额外的代码服务器端上传完成。

  • 我正在尝试使用spring-kafka版本2.3.0编写kafka消费者。M2库。为了处理运行时错误,我使用SeekToSumtErrorHandler.class和DeadLetterPublishingRecoverer作为我的恢复器。这仅在我的消费者代码抛出异常时才正常工作,但在无法反序列化消息时失败。 我尝试自己实现ErrorHandler,我很成功,但使用这种方法,我自己最终编写了DLT