我们有一个具有 20 个独立管道的流式处理作业,每个管道具有一个或多个 Kafka 主题源。
当我们使用一个新的jar(我又添加了一个管道)重新启动作业,并且AllowNonRestoredState=true时,我们注意到从检查点恢复Operatorstate的奇怪行为。
恢复其运算符状态org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase[]-消费者子任务0没有恢复状态。
由于此Kafka消费者偏移量被设置为所有分区的EARLIEST。Flink版本:1.13.0
这是Flink中的已知问题吗?
注意-在某些情况下,我们在两个不同组的管道中有相同的主题。根据我的理解,这不应影响任何州的恢复,因为Kafka联盟州位于管道的每个Kafka消费源上。
由于您没有为运算符显式提供 UID,因此您依赖于自动生成的 UID。只要作业的拓扑保持不变,它们才是稳定的。添加新管道时,这可能会更改以前自动生成的部分或全部 UID,并使该状态不可恢复。
如果您希望确保能够恢复状态,请在所有有状态操作符上设置uid。详见Flink的生产准备清单。
如果您想要显式地设置与当前自动生成的UID相匹配的UID,以便您可以安全地发展作业,那么您可以通过REST API检查正在运行的作业来找到每个操作符的散列UID(每个操作符的vertexId就是它的散列UID)。然后,您可以在代码中的相同操作符上结合使用这些散列uid和< code>setUidHash()。请参见Flink文档中的匹配运算符状态。
主程序正在消费kafka事件,然后过滤- 但是我得到了以下例外: 以下是flink-conf.yaml中的一些配置 任何想法为什么会发生异常以及如何解决问题? 谢谢
我正在检查Flink Sql Table与kafka连接器是否可以在EXACTLY_ONCE模式下执行,我的方法是创建一个表,设置合理的检查点间隔,并在event_time字段上使用简单的翻滚函数,最后重新启动我的程序。 以下是我的详细进度: 1:创建一个Kafka表 2:启动我的 Flink 作业,如下所示配置 3:执行我的sql 如我们所见,翻转窗口间隔为5分钟,检查点间隔为30秒,每个翻转窗
我正在写一个测试flink两步提交的案例,下面是概述。 正是曾经的kafka生产者。是mysql接收器扩展。是mysql接收器扩展,这个接收器偶尔会抛出一个exeption来模拟检查点失败。 当检查点失败并恢复时,我发现mysql两步提交可以正常工作,但Kafka消费者会读取上次成功的偏移量,Kafka生产者会生成消息,即使他在检查点失败之前就这样做了。 在这种情况下,如何避免重复消息? 谢谢你的
我将微服务实现为事件源聚合,而事件源聚合又被实现为Flink FlatMapFunction。在基本设置中,聚合从两个kafka主题读取事件和命令。然后,它将新事件写入第一个主题并处理第三个主题的结果。因此,Kafka充当事件存储。希望这张图能有所帮助: 由于Kafka没有选中点,因此命令可能会被重放两次,而且输出事件似乎也可以在主题中写入两次。 在重复消息的情况下如何恢复状态?聚合是否可以知道其
我有一份flink的工作,它使用Kafka的数据,制作一些无状态平面图,并向Kafka生成数据,这是一份工作量非常小的工作。 例如,在作业需要从检查点还原之前,它通常会无问题地获取检查点,而它只是无法使用下面的堆栈跟踪还原状态。 状态非常小,我相信它只是Kafka偏移量,它至少运行了一次语义。 所有操作员都有。uid()集,我完全没有主意了。 这是尝试从检查点重新启动时的错误: 任务管理器在正常操
版本flink 1.7 我正在尝试从保存点(或检查点)还原flink作业,该作业所做的是读取kafka的内容- 我使用rocksdb和启用的检查点。 现在我尝试手动触发一个保存点。每个聚合的预期值为30(1个数据/每分钟)。但是当我从保存点(flink run-d-s{url})恢复时,聚合值不是30(小于30,取决于我取消flink作业和恢复的时间)。当作业正常运行时,它得到30。 我不知道为什