当前位置: 首页 > 知识库问答 >
问题:

关于Flink外部化检查点的两个问题

华凯捷
2023-03-14

我有两个关于Flink外部化检查站的问题

(Q1)我可以在flink-conf.yaml中设置“state.checkpoints.dir”,以使外部化的检查点正常工作,但当我从IDE运行flink时,如何实现同样的效果呢?我尝试了中提到的全局配置方法(http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/state-checkpoints-dir-td17921.html)但运气不好。我就是这样做的:

Configuration cfg =
                GlobalConfiguration.loadConfiguration();
cfg.setString("state.checkpoints.dir", "file:///tmp/checkpoints/state");
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

这是IDE中显示的错误消息:

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Failed to submit job ef7050e2308a4787d983d80f3c07f55c (Long Taxi Rides (checkpointed))
    at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1325)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:447)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
    at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
    at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
    at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
    at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
    at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
    at akka.actor.ActorCell.invoke(ActorCell.scala:495)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
    at akka.dispatch.Mailbox.run(Mailbox.scala:224)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalStateException: CheckpointConfig says to persist periodic checkpoints, but no checkpoint directory has been configured. You can configure configure one via key 'state.checkpoints.dir'.
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:211)
    at org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:478)
    at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:291)
    at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1277)
    ... 19 more

Process finished with exit code 1

(Q2)在检查站文件中(https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/checkpointing.html),它说“这样,如果您的工作失败,您将有一个检查点可以恢复。”,取消的工作怎么样?新作业是继续使用现有的检查点,还是从自己的检查点开始?

共有3个答案

慕宏儒
2023-03-14

重新从Eclipse设置检查点目录,通常我只是在设置要使用的后端时这样做,例如。

env.setStateBackend(new FsStateBackend(options.getCheckpointDir()));

重新取消作业-检查点目录被删除。如果您想在停止(取消)作业后从已知状态恢复,您需要执行保存点。

袁泰
2023-03-14

第一个问题,使用自定义配置创建本地环境:

val conf = new Configuration()
conf.setString(CoreOptions.CHECKPOINTS_DIRECTORY, "file:///user/flink/checkpoint/storing/")
StreamExecutionEnvironment.createLocalEnvironment(4, conf)

第二个问题,正如大卫所说:

config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
陶文林
2023-03-14

您可以控制取消作业时是否删除外部化的检查点。如果要保留它们,可以执行以下操作:

CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

有关详细信息,请参阅文档。

要从外部化的检查点恢复,需要执行以下操作(与从保存点恢复相同):

$ bin/flink run -s :checkpointMetaDataPath [:runArgs]
 类似资料:
  • 下面是我对Flink的疑问。 我们可以将检查点和保存点存储到RockDB等外部数据结构中吗?还是只是我们可以存储在RockDB等中的状态。 状态后端是否会影响检查点?如果是,以何种方式? 什么是状态处理器API?它与我们存储的保存点和检查点直接相关吗?状态处理器API提供了普通保存点无法提供的哪些额外好处? 对于3个问题,请尽可能描述性地回答。我对学习状态处理器API很感兴趣,但我想深入了解它的应

  • 我们可以使用readResolve和WriteReplace方法来指定可外部化实例和可序列化实例的替换对象…… 只是想知道几件事: 1)对于可序列化的类,readObject(ObjectInputStream is)和writeObject(ObjectOutputStream OS)方法可以与readResolve()和writeReplace()结合使用吗?是否可以替换对象,然后使用read

  • 源在内存中创建任意数量的事件,每秒吞吐量为1个事件。每个事件都有用于分区流的唯一id(使用keyBy运算符),并通过映射函数向托管状态(使用ValueState)添加约100KB。然后将事件简单地传递给不执行任何操作的接收器。 使用上面描述的设置,我们发送了1200个事件,检查点间隔和最小暂停设置为5秒。当事件以恒定的速度和相等的状态量出现时,我们期望检查点的大小或多或少是恒定的。然而,我们观察到

  • 我正在kubernetes中运行一个Flink作业。我的设置如下 1个作业管理器吊舱 我的flink作业从kafka源获取时间序列数据(时间、值),进行聚合和其他转换,并将其发布到kafka接收器。 有时,我的作业因检查点异常(10分钟后超时)而失败,主要是由一名操作员完成的。我不理解异步持续时间(在图中)的含义,为什么它花费的时间最长。在这个异常之前,Kafka的吞吐量非常高,有500-800万

  • 我们正在尝试使用RocksDB后端设置Flink有状态作业。我们使用会话窗口,有30分钟的间隔。我们使用aggregateFunction,所以不使用任何Flink状态变量。通过采样,我们的事件数不到20k次/秒,新会话数不到20-30次/秒。我们的会议基本上收集了所有的事件。会话累加器的大小会随着时间而增大。我们总共使用了10G内存和Flink1.9,128个容器。以下是设置: 从我们对给定时间

  • 我有以下CEP PatternStream,其中数据流是基于实体ID分区的,因为只有实体具有相同的实体ID时,我才对模式匹配感兴趣: 但随后我注意到检查点状态大小随着实体ID数量的增加而增加。如果我对检查点的理解是正确的,这是意料之中的,因为运算符状态的数量会增加。但我想弄清楚是否有其他方法可以最小化检查点状态大小。 > 有没有不同的方法来实现这种模式匹配,而不根据实体ID对数据流进行分区?