我试图检查/保存我在EMR上运行的flink状态到AWS上的s3存储桶。请注意:
org.apache.flink.util.FlinkException: Triggering a savepoint for the job 16c162c47f225cddad974056c9494b6d failed.
at org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:723)
at org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:701)
at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:985)
at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:698)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1065)
at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointTriggerException: Failed to trigger savepoint. Decline reason: An Exception occurred while triggering the checkpoint.........
Caused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointTriggerException: Failed to trigger savepoint. Decline reason: An Exception occurred while triggering the checkpoint.
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
jobmanager日志:
java.io.IOException: Cannot instantiate file system for URI: s3://flink-bc/savepoints
at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:399)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
at org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.initializeLocationForSavepoint(AbstractFsCheckpointStorage.java:147)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.triggerCheckpoint(CheckpointCoordinator.java:511)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.triggerSavepoint(CheckpointCoordinator.java:370)
at org.apache.flink.runtime.jobmaster.JobMaster.triggerSavepoint(JobMaster.java:951)
在使用带有s3的最新Flink版本(1.10.0)将检查点存储在s3存储桶中时,我遇到了类似的问题。
因此,请在这里找到我提供的详细工作答案。
下面是我对Flink的疑问。 我们可以将检查点和保存点存储到RockDB等外部数据结构中吗?还是只是我们可以存储在RockDB等中的状态。 状态后端是否会影响检查点?如果是,以何种方式? 什么是状态处理器API?它与我们存储的保存点和检查点直接相关吗?状态处理器API提供了普通保存点无法提供的哪些额外好处? 对于3个问题,请尽可能描述性地回答。我对学习状态处理器API很感兴趣,但我想深入了解它的应
如果Flink应用程序在发生故障或更新后正在启动备份,那么不明确属于KeyedState或OperatorState的类变量是否会持久化? 例如,Flink的留档中描述的BoundedOutOfOrdernessGenerator有一个电流最大时间戳变量。如果更新了Flink应用程序,电流最大时间戳中的值是否会丢失,或者是否会写入在应用程序更新之前创建的保存点? 这样做的真正原因是我想实现一个自定
我知道stackoverflow上也有类似的问题,但在调查了其中几个之后,我知道 > 他们正在使用不同的存储格式 但这些并不是令人困惑的地方,我不知道什么时候该用一个,什么时候该用另一个。 考虑以下两种情况: 如果由于某种原因(例如错误修复或意外崩溃)需要关闭或重新启动整个应用程序,那么我必须使用保存点来恢复整个应用程序
我想使用Rocksdb状态后端在Flink中保持大约2TB的状态。我将使用增量检查点,因此它将大大减少检查点时间。 但我有时不得不更改代码,例如重新缩放、错误修复、添加新的过滤器/映射、添加新的源/汇等。 所有这些都会影响作业拓扑。当状态发生任何变化时,我可以再次引导状态。但其他时候,引导状态可能很困难,因为这意味着我浪费时间。 在这种情况下,我必须采取一个保存点来重新开始我的工作。当作业运行时,
我知道Apache Flink中有三种状态后端:MemoryStateBend、FSStateBend和RockSDBStateBend。 MemoryStateBindend将检查点存储到本地RAM中,FSStateBindend将检查点存储到本地文件系统中,RockSDBStateBindend将检查点存储到RocksDB中。我对RocksDBStateBend有一些问题。 据我所知,Rock
考虑使用以下管道的Apache Flink流媒体应用程序: 其中每个函数都是非状态运算符(例如