当前位置: 首页 > 知识库问答 >
问题:

如何从检查点恢复Flink Sql作业?

常雅珺
2023-03-14

我正在检查Flink Sql Table与kafka连接器是否可以在EXACTLY_ONCE模式下执行,我的方法是创建一个表,设置合理的检查点间隔,并在event_time字段上使用简单的翻滚函数,最后重新启动我的程序。

以下是我的详细进度:

1:创建一个Kafka表

CREATE TABLE IF NOT EXISTS LOG_TABLE(
   id String,
   ...
   ...
   event_timestamp timestamp(3), watermark for event_timestamp as ....
) 

2:启动我的 Flink 作业,如下所示配置

StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        environment.getCheckpointConfig().setCheckpointInterval(30000L);
        environment.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        environment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        environment.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///tmp/checkpoint/"));
        environment.setStateBackend(new HashMapStateBackend());
        environment.setParallelism(1);
        EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();

TableEnvironment tableEnvironment = StreamTableEnvironment.create(environment, settings);
tableEnvironment.getConfig().getConfiguration().setBoolean("table.exec.emit.early-fire.enabled", true);
        tableEnvironment.getConfig().getConfiguration().setString("table.exec.emit.early-fire.delay", "1s");

3:执行我的sql

select tumble_end(event_timestamp, interval '5' minute), 
       count(1) 
       from LOG_TABLE 
       group by tumble(event_timestamp, interval '5' minute)

如我们所见,翻转窗口间隔为5分钟,检查点间隔为30秒,每个翻转窗口触发6个检查点。

在这种情况下,窗口状态丢失:

  1. 下午2:00:00,午餐作业,发送100条消息。(作业id为bd208afa6599864831f008d429a527bb,chk1-3触发成功,检查点dir创建检查点文件)
  2. 下午2:01:40,关闭我的工作,修改Checkpoint Storage目录为 /tmp/checkpoint/bd208afa6599864831f008d429a527bb/chk-3
  3. 下午2:02:00,重新启动作业并发送另一条100消息。

所有消息都在2分钟内发送,因此从检查点重新启动后,作业输出应为200,但结果为100,作业失去了第一个作业的状态。我的进度有什么错误吗?请帮忙查一下,谢谢。

共有1个答案

饶滨海
2023-03-14

在保留精确一次保证的同时重新启动Flink作业需要以特殊方式启动后续作业,以便新作业从恢复上一个作业的状态开始。(修改检查点存储目录,如您在步骤2中所做的那样,没有帮助。)

如果使用 SQL 客户端启动作业,请参阅从保存点启动 SQL 作业,这涉及执行类似如下操作

SET 'execution.savepoint.path' = '/tmp/flink-savepoints/...';

在启动需要恢复状态的查询之前。

如果您使用的是表API,那么详细信息取决于您如何启动作业,但是您可以使用命令行执行以下操作

$ ./bin/flink run \
      --detached \ 
      --fromSavepoint /tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab \
      ./examples/streaming/StateMachineExample.jar

或者您可能正在使用REST API,在这种情况下,您将向< code>/jars/:jarid/run发送配置了< code > save path 的内容。

请注意,您可以使用保留的检查点而不是保存点来重新启动或重新缩放作业。另请注意,如果更改查询的方式使旧状态与新查询不兼容,则这些都不起作用。有关该主题的更多信息,请参阅 FLIP-190。

Flink Operations Playground 是一个教程,更详细地介绍了此主题和相关主题。

 类似资料:
  • 我有一份flink的工作,它使用Kafka的数据,制作一些无状态平面图,并向Kafka生成数据,这是一份工作量非常小的工作。 例如,在作业需要从检查点还原之前,它通常会无问题地获取检查点,而它只是无法使用下面的堆栈跟踪还原状态。 状态非常小,我相信它只是Kafka偏移量,它至少运行了一次语义。 所有操作员都有。uid()集,我完全没有主意了。 这是尝试从检查点重新启动时的错误: 任务管理器在正常操

  • 1)以上假设是否正确。2)当发生故障时,滚动窗口有状态是否有意义,我们从最后一个kafka分区提交的偏移量开始。3)当滚动窗口有状态时,这个状态什么时候可以被flink使用。4)为什么检查点和保存点的状态大小不同。5)当发生故障时,flink总是从sorce运算符开始。对吗?

  • 我正在使用KCL(v2)将Kafka消费者转换为AWS动觉消费者。在Kafka中,偏移量用于帮助消费者跟踪其最近使用的消息。如果我的Kafka应用程序死机,它将使用重新启动时停止的偏移量。 然而,这在Kinesis中是不一样的。我可以设置,但唯一的参数是、或。如果我的Kinesis应用程序死机,它将不知道重新启动时从哪里恢复消费。 我的KCL消费者非常简单。方法如下所示: 而RecordProce

  • 目标:从Kinesis读取数据,并通过火花流将数据以拼花格式存储到S3 情况:应用程序最初运行良好,批量运行1小时,平均处理时间不到30分钟。出于某种原因,假设应用程序崩溃,我们尝试从检查点重新启动。现在,处理过程需要很长时间,不会向前推进。我们尝试以1分钟的分批间隔测试相同的东西,处理运行良好,分批完成需要1.2分钟。当我们从检查点恢复时,每批大约需要15分钟 注意:我们使用s3作为检查点,使用

  • 我们目前正在kubernetes上运行flink,作为使用这个helm模板的作业集群:https://github.com/docker-flink/examples/tree/master/helm/flink(带有一些添加的配置)。 如果我想关闭集群,重新部署新映像(由于应用程序代码更新)并重新启动,我将如何从保存点进行恢复? jobManager命令严格设置在standalone-job.s

  • 所以我有一个我似乎无法解决的问题。我有一个ViewPager在我的一个活动中,比如MainActivity。我正在实现所有必要的方法,以保存和检索实例状态时,活动被取消在后台。但是当活动试图恢复它的状态时,片断会被恢复,但它们并没有附加到viewpager,所以我得到的只是一个白色屏幕。 下面是相关代码: mainactivity.java ViewPagerAdapter.java 因此,如果我