当前位置: 首页 > 知识库问答 >
问题:

如何配置Flink以使用S3进行后端状态和检查点

傅树
2023-03-14

我有一个FlinkV1.2的设置,3个JobManager,2个TaskManager。对于后端状态和检查点以及zookeeper storageDir,我想使用S3桶而不是hdfs

2017-03-22 09:52:40,971 ERROR org.apache.flink.runtime.jobmanager.JobManager                - Error while starting up JobManager
java.io.IOException: No file system found with scheme s3, referenced in file URI 's3:///[bucket]/recovery/blob'.
        at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:276)
        at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:310)
        at org.apache.flink.runtime.blob.FileSystemBlobStore.<init>(FileSystemBlobStore.java:67)
        at org.apache.flink.runtime.blob.BlobServer.<init>(BlobServer.java:114)
        at org.apache.flink.runtime.jobmanager.JobManager$.createJobManagerComponents(JobManager.scala:2488)
        at org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:2643)
        at org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:2595)
        at org.apache.flink.runtime.jobmanager.JobManager$.startActorSystemAndJobManagerActors(JobManager.scala:2242)
        at org.apache.flink.runtime.jobmanager.JobManager$.liftedTree3$1(JobManager.scala:2020)
        at org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:2019)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply$mcV$sp(JobManager.scala:2098)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply(JobManager.scala:2076)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply(JobManager.scala:2076)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.flink.runtime.jobmanager.JobManager$.retryOnBindException(JobManager.scala:2131)
        at org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:2076)
        at org.apache.flink.runtime.jobmanager.JobManager$$anon$9.call(JobManager.scala:1971)
        at org.apache.flink.runtime.jobmanager.JobManager$$anon$9.call(JobManager.scala:1969)
        at org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
        at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
        at org.apache.flink.runtime.jobmanager.JobManager$.main(JobManager.scala:1969)
        at org.apache.flink.runtime.jobmanager.JobManager.main(JobManager.scala)
2017-03-22 09:52:40,981 WARN  org.apache.hadoop.security.UserGroupInformation               - PriviledgedActionException as:ubuntu (auth:SIMPLE) cause:java.io.IOException: No file system found with scheme s3, referenced in file URI 's3:///[bucket]/recovery/blob'.
2017-03-22 09:52:40,981 ERROR org.apache.flink.runtime.jobmanager.JobManager                - Failed to run JobManager.
java.io.IOException: No file system found with scheme s3, referenced in file URI 's3:///[bucket]/recovery/blob'.
        at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:276)
        at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:310)
        at org.apache.flink.runtime.blob.FileSystemBlobStore.<init>(FileSystemBlobStore.java:67)
        at org.apache.flink.runtime.blob.BlobServer.<init>(BlobServer.java:114)
        at org.apache.flink.runtime.jobmanager.JobManager$.createJobManagerComponents(JobManager.scala:2488)
        at org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:2643)
        at org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:2595)
        at org.apache.flink.runtime.jobmanager.JobManager$.startActorSystemAndJobManagerActors(JobManager.scala:2242)
        at org.apache.flink.runtime.jobmanager.JobManager$.liftedTree3$1(JobManager.scala:2020)
        at org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:2019)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply$mcV$sp(JobManager.scala:2098)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply(JobManager.scala:2076)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply(JobManager.scala:2076)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.flink.runtime.jobmanager.JobManager$.retryOnBindException(JobManager.scala:2131)
        at org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:2076)
        at org.apache.flink.runtime.jobmanager.JobManager$$anon$9.call(JobManager.scala:1971)
        at org.apache.flink.runtime.jobmanager.JobManager$$anon$9.call(JobManager.scala:1969)
        at org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
        at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
        at org.apache.flink.runtime.jobmanager.JobManager$.main(JobManager.scala:1969)
        at org.apache.flink.runtime.jobmanager.JobManager.main(JobManager.scala)
2017-03-22 09:52:40,983 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Shutting down remote daemon.
2017-03-22 09:52:40,993 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remote daemon shut down; proceeding with flushing remote transports.

我没有安装hadoop。不确定是否需要这样做,以及是否需要这样做,应该如何/在哪里安装/配置它?

编辑:在使用以下hadoop xml(core-site.xml)配置Flink之后,我并不真正理解IAM部分,而且我也没有使用EMR,我自己(在AWS中)安装了集群,以便能够更新Flink,而不依赖于映像:

<configuration>
    <property>
        <name>fs.defaultFS</name>
        <value>s3://[ bucket ] </value>
    </property>

    <property>
        <name>fs.s3a.access.key</name>
        <description>[ Access Key ]</description>
    </property>

    <property>
        <name>fs.s3a.secret.key</name>
        <description>[ Secret Key ]</description>
    </property>

    <property>
        <name>fs.s3.awsAccessKeyId</name>
        <description>[ Access Key ]</description>
    </property>

    <property>
        <name>fs.s3.awsSecretAccessKey</name>
        <description>[ Secret Key ]</description>
    </property>

    <property>
        <name>fs.s3n.awsAccessKeyId</name>
        <value>[ Access Key ]</value>
    </property>

    <property>
        <name>fs.s3n.awsSecretAccessKey</name>
        <value>[ Secret Key ]</value>
    </property>

    <property>
        <name>fs.s3.impl</name>
        <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
    </property>

    <!-- Comma separated list of local directories used to buffer
         large results prior to transmitting them to S3. -->
    <property>
        <name>fs.s3.buffer.dir</name>
        <value>/tmp</value>
    </property>
</configuration>

我得到这个错误:

  2017-03-24 11:20:17,760 ERROR org.apache.flink.runtime.jobmanager.JobManager                - Error while starting up JobManager
com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain
        at com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117)
        at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3521)
        at com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1031)
        at com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:994)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:297)
        at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:303)
        at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:271)
        at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:310)
        at org.apache.flink.runtime.blob.FileSystemBlobStore.<init>(FileSystemBlobStore.java:67)
        at org.apache.flink.runtime.blob.BlobServer.<init>(BlobServer.java:114)
        at org.apache.flink.runtime.jobmanager.JobManager$.createJobManagerComponents(JobManager.scala:2488)
        at org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:2643)
        at org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:2595)
        at org.apache.flink.runtime.jobmanager.JobManager$.startActorSystemAndJobManagerActors(JobManager.scala:2242)
        at org.apache.flink.runtime.jobmanager.JobManager$.liftedTree3$1(JobManager.scala:2020)
        at org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:2019)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply$mcV$sp(JobManager.scala:2098)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply(JobManager.scala:2076)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply(JobManager.scala:2076)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.flink.runtime.jobmanager.JobManager$.retryOnBindException(JobManager.scala:2131)
        at org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:2076)
        at org.apache.flink.runtime.jobmanager.JobManager$$anon$9.call(JobManager.scala:1971)
        at org.apache.flink.runtime.jobmanager.JobManager$$anon$9.call(JobManager.scala:1969)
        at org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
        at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
        at org.apache.flink.runtime.jobmanager.JobManager$.main(JobManager.scala:1969)
        at org.apache.flink.runtime.jobmanager.JobManager.main(JobManager.scala)

编辑:我的错误我在description字段中设置了关键字,而不是Value。

共有1个答案

莘翰采
2023-03-14

请查看使用S3运行Flink的指南,了解如何设置S3。

我认为您缺少的是带有fs.s3.impl配置键的hadoop配置文件。即使您没有使用Hadoop,仍然需要使用Hadoop配置文件。

 类似资料:
  • 我有一个FlinkV1.2的设置,3个JobManager,2个TaskManager。我想将hdfs用于后端状态和检查点以及zookeeper storageDir 在JobManager日志中 Hadoop作为单个节点集群安装在我在Settings中设置的VM上。为什么Flink要求配置额外的参数?(顺便说一句,官方文件中没有这些内容)

  • 我可以在文档中看到: Flink目前只为没有迭代的作业提供处理保证。对迭代作业启用检查点会导致异常。为了在迭代程序上强制检查点,用户需要在启用检查点时设置一个特殊的标志:env.enablecheckpointing(interval,force=true)。 如果是一个而不是一个(这意味着它也可以保存状态),会有什么变化吗?

  • 在广播模式的文档中,提到没有RocksDB状态后端: 如果应用程序使用rocksdb作为状态后端,这将如何影响保存点行为?这是否意味着状态在保存点期间不存储,因此不会恢复?

  • 我有以下CEP PatternStream,其中数据流是基于实体ID分区的,因为只有实体具有相同的实体ID时,我才对模式匹配感兴趣: 但随后我注意到检查点状态大小随着实体ID数量的增加而增加。如果我对检查点的理解是正确的,这是意料之中的,因为运算符状态的数量会增加。但我想弄清楚是否有其他方法可以最小化检查点状态大小。 > 有没有不同的方法来实现这种模式匹配,而不根据实体ID对数据流进行分区?

  • 嗨,我需要检查siteminder的状态,看看它是否已关闭。我当前的解决方案是在指定的环境中ping siteminder的策略服务器。是否有任何方法可以使用c#检查Siteminder是否正常运行?提前感谢

  • 我将Rocksdb设置为状态后端的位置存在太多空chk-*文件 我正在使用FlinkKafkaConsumer从Kafka主题获取数据。我使用RocksDb作为状态后端。我正在打印从Kafka那里收到的信息。以下是我必须设置状态后端的属性: 在flink-conf.yaml我还设置了这个属性: 我使用的是简单的单节点flink集群(使用./start cluster.sh)。我启动了该作业并使其运