当前位置: 首页 > 知识库问答 >
问题:

无法将点/检查点flink状态保存到AWS S3存储桶

宓弘壮
2023-03-14

我试图检查/保存我在EMR上运行的flink状态到AWS上的s3存储桶。请注意:

  • 实例(主节点和核心节点)正确设置了IAM角色,以访问s3 bucket及其内部的所有目录/文件(AmazonS3FullAccess策略附加到该角色,没有任何内容覆盖它)
org.apache.flink.util.FlinkException: Triggering a savepoint for the job 16c162c47f225cddad974056c9494b6d failed.
    at org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:723)
    at org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:701)
    at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:985)
    at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:698)
    at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1065)
    at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
    at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
    at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointTriggerException: Failed to trigger savepoint. Decline reason: An Exception occurred while triggering the checkpoint.........
Caused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointTriggerException: Failed to trigger savepoint. Decline reason: An Exception occurred while triggering the checkpoint.
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) 

jobmanager日志:

java.io.IOException: Cannot instantiate file system for URI: s3://flink-bc/savepoints
    at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
    at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:399)
    at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
    at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
    at org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.initializeLocationForSavepoint(AbstractFsCheckpointStorage.java:147)
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.triggerCheckpoint(CheckpointCoordinator.java:511)
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.triggerSavepoint(CheckpointCoordinator.java:370)
    at org.apache.flink.runtime.jobmaster.JobMaster.triggerSavepoint(JobMaster.java:951)

共有1个答案

归松
2023-03-14

在使用带有s3的最新Flink版本(1.10.0)将检查点存储在s3存储桶中时,我遇到了类似的问题。

因此,请在这里找到我提供的详细工作答案。

 类似资料:
  • 下面是我对Flink的疑问。 我们可以将检查点和保存点存储到RockDB等外部数据结构中吗?还是只是我们可以存储在RockDB等中的状态。 状态后端是否会影响检查点?如果是,以何种方式? 什么是状态处理器API?它与我们存储的保存点和检查点直接相关吗?状态处理器API提供了普通保存点无法提供的哪些额外好处? 对于3个问题,请尽可能描述性地回答。我对学习状态处理器API很感兴趣,但我想深入了解它的应

  • 如果Flink应用程序在发生故障或更新后正在启动备份,那么不明确属于KeyedState或OperatorState的类变量是否会持久化? 例如,Flink的留档中描述的BoundedOutOfOrdernessGenerator有一个电流最大时间戳变量。如果更新了Flink应用程序,电流最大时间戳中的值是否会丢失,或者是否会写入在应用程序更新之前创建的保存点? 这样做的真正原因是我想实现一个自定

  • 我知道stackoverflow上也有类似的问题,但在调查了其中几个之后,我知道 > 他们正在使用不同的存储格式 但这些并不是令人困惑的地方,我不知道什么时候该用一个,什么时候该用另一个。 考虑以下两种情况: 如果由于某种原因(例如错误修复或意外崩溃)需要关闭或重新启动整个应用程序,那么我必须使用保存点来恢复整个应用程序

  • 我想使用Rocksdb状态后端在Flink中保持大约2TB的状态。我将使用增量检查点,因此它将大大减少检查点时间。 但我有时不得不更改代码,例如重新缩放、错误修复、添加新的过滤器/映射、添加新的源/汇等。 所有这些都会影响作业拓扑。当状态发生任何变化时,我可以再次引导状态。但其他时候,引导状态可能很困难,因为这意味着我浪费时间。 在这种情况下,我必须采取一个保存点来重新开始我的工作。当作业运行时,

  • 我知道Apache Flink中有三种状态后端:MemoryStateBend、FSStateBend和RockSDBStateBend。 MemoryStateBindend将检查点存储到本地RAM中,FSStateBindend将检查点存储到本地文件系统中,RockSDBStateBindend将检查点存储到RocksDB中。我对RocksDBStateBend有一些问题。 据我所知,Rock

  • 考虑使用以下管道的Apache Flink流媒体应用程序: 其中每个函数都是非状态运算符(例如