当前位置: 首页 > 知识库问答 >
问题:

使用Kafka作为EventStore时恢复Flink中的状态一致性

丰胤运
2023-03-14

我将微服务实现为事件源聚合,而事件源聚合又被实现为Flink FlatMapFuncthtml" target="_blank">ion。在基本设置中,聚合从两个kafka主题读取事件和命令。然后,它将新事件写入第一个主题并处理第三个主题的结果。因此,Kafka充当事件存储。希望这张图能有所帮助:

  RPC Request                              RPC Result
  |                                                 |
  ~~~~> Commands-|              |---> Results ~~~~~~|
                 |-->Aggregate--|
  ~> Input evs. -|              |---> output evs. ~~~
  |                                                 |
  ~~~~~<~~~~~~~~~~~<~~~feedbak loop~~~~~<~~~~~~~~<~~~

由于Kafka没有选中点,因此命令可能会被重放两次,而且输出事件似乎也可以在主题中写入两次。

在重复消息的情况下如何恢复状态?聚合是否可以知道其输入流何时是最新的以开始处理命令?

我想到了几种解决方案:

>

如果在2中无法从Flink中删除事件,stateful源可能会从偏移量中读取事件并尝试匹配聚合中的重复事件并将其删除。此选项似乎不健壮,因为在某些情况下补丁不是确定性的,并且会受到缺陷的影响,因为应该为每个聚合和拓扑重新考虑它,并且它不会保证恢复(例如,在连续重启的情况下)。因此,这是一个糟糕的解决方案。

这是一种不同的方法。它将创建一个带有两个特殊水印的特殊KafkaSource:第一个,KafkaSourceStartedWatermark,将始终在源启动时发送,以通知从属操作员。发送此水印时,源会在内部记录当前Kafka偏移量。第二个是KafkaSourceUpToDateWatermark,当到达偏移量时由源发送。这些水印将沿着拓扑透明地传播。操作员应该能够处理这些水印,实现一个特殊的水印通知接口。然后,聚合将能够缓冲或删除RPC命令,直到它在每个输入源中都是最新的。

interface WatermarkNotifiable  {
    void started(String watermarkId);//KafkaSourceStartedWatermark watermark
    void upToDate(String watermarkId);//KafkaSOurceUpToDateWatermark watermark
}  

如果无法实现3中的基础设施,KafkaSource可以实现一个构造函数,指定一个特殊的水印事件,该事件可以传递给运营商,但这将要求所有运营商依赖于这些水印,然后重新发布。

其他不同的方法是不处理比标准更早的命令。例如,命令有一个入口时间戳。如果使用时间,时间同步至关重要。

  1. 将Kafka用作(CQRS)事件存储。好主意
  2. Kafka-了解消费者是否是最新的
  3. Kafka

共有1个答案

韩明德
2023-03-14

创建新的Conmuter运算符类型。这就像一个来源。它由多个表示事件和命令主题的源组成。它开始于“恢复”状态。在此状态下,它读取事件主题的最新内容。同时,对于命令,它存储或删除它们。一旦更新,它将考虑恢复并“打开”命令的方式。它可以作为一个源和一个操作符单独实现。

FlinkKafkaProducerXX不足以做到这一点,但它将是实现它的基础。

 类似资料:
  • 我们有一个具有 20 个独立管道的流式处理作业,每个管道具有一个或多个 Kafka 主题源。 当我们使用一个新的jar(我又添加了一个管道)重新启动作业,并且AllowNonRestoredState=true时,我们注意到从检查点恢复Operatorstate的奇怪行为。 我们当前用于添加管道的配置是静态的,我们基本上正在更改代码以添加任何新管道。 我们没有为任何运算符设置任何UID。 当我们从

  • 我正在K8上运行flink cluster,状态约为1TB。 我面临的问题之一是获取保存点并恢复作业。现在,这些更新有时是简单的代码更新,而不是并行性更改。但是获取保存点然后用旧状态恢复新作业的时间相当长。 是否有方法对作业进行就地更新,以使本地状态和作业ID不发生更改,从而避免执行保存点恢复所需的时间?

  • 我使用Apache Flink 1.9和标准检查点/保存点机制来FS。 我的问题是:如果作业的代码发生了更改,从保存点恢复作业的正确方法是什么?例如,在重构之后,我重命名了几个类,之后我无法从旧的检查点恢复。 我丢失了我的数据,想问-在这种情况下我能做些什么? 所有运算符都有uid和name

  • 主程序正在消费kafka事件,然后过滤- 但是我得到了以下例外: 以下是flink-conf.yaml中的一些配置 任何想法为什么会发生异常以及如何解决问题? 谢谢

  • 我在工作中使用ProcessWindowFunction并保持StateValue。我的目标是将值保持在超过1个窗口的状态,这意味着状态不会在每个窗口的末尾被清除。我有两个问题: 我怎样才能清除状态?有没有设置触发器并用它来清除状态的选项?(当在ProcessFunction中使用状态时,我能够设置触发器以执行此清除,即使没有新事件) 有没有一种方法来构建一个单元测试来检查我的ProcessWin

  • 我试图理解使用片段保存和恢复状态的过程。我用它创建了滑动导航菜单。 其中一个片段中有以下代码: 例如,我想在用户退出片段之前保存复选框的状态,并在再次创建片段时恢复它。如何实现这一点? 编辑: 根据raxellson的回答,我将片段更改为: 我已被记录,因此未保存savedInstanceState。我做错了什么?