当前位置: 首页 > 知识库问答 >
问题:

使用RocksDb作为状态后端的Flink检查点的空chk-*目录太多

淳于星宇
2023-03-14

我将Rocksdb设置为状态后端的位置存在太多空chk-*文件

我正在使用FlinkKafkaConsumer从Kafka主题获取数据。我使用RocksDb作为状态后端。我正在打印从Kafka那里收到的信息。以下是我必须设置状态后端的属性:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(100);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(50);
        env.getCheckpointConfig().setCheckpointTimeout(60);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
StateBackend rdb = new RocksDBStateBackend("file:///Users/user/Documents/telemetry/flinkbackends10", true);
env.html" target="_blank">setStateBackend(rdb);
env.execute("Flink kafka");

在flink-conf.yaml我还设置了这个属性:

state.checkpoints.num-retained: 3

我使用的是简单的单节点flink集群(使用./start cluster.sh)。我启动了该作业并使其运行了1个小时,我看到在/用户/用户/文档/遥测/flinkbackends10位置下创建的chk-*文件太多

chk-10      chk-12667   chk-18263   chk-20998   chk-25790   chk-26348   chk-26408   chk-3       chk-3333    chk-38650   chk-4588    chk-8       chk-96
chk-10397   chk-13      chk-18472   chk-21754   chk-25861   chk-26351   chk-26409   chk-30592   chk-34872   chk-39405   chk-5       chk-8127    chk-97
chk-10649   chk-13172   chk-18479   chk-22259   chk-26216   chk-26357   chk-26411   chk-31097   chk-35123   chk-39656   chk-5093    chk-8379    chk-98
chk-1087    chk-14183   chk-18548   chk-22512   chk-26307   chk-26360   chk-27055   chk-31601   chk-35627   chk-4       chk-5348    chk-8883    chk-9892
chk-10902   chk-15444   chk-18576   chk-22764   chk-26315   chk-26377   chk-28064   chk-31853   chk-36382   chk-40412   chk-5687    chk-9       chk-99
chk-11153   chk-15696   chk-18978   chk-23016   chk-26317   chk-26380   chk-28491   chk-32356   chk-36885   chk-41168   chk-6       chk-9135    shared
chk-11658   chk-16201   chk-19736   chk-23521   chk-26320   chk-26396   chk-28571   chk-32607   chk-37389   chk-41666   chk-6611    chk-9388    taskowned
chk-11910   chk-17210   chk-2       chk-24277   chk-26325   chk-26405   chk-29076   chk-32859   chk-37642   chk-41667   chk-7       chk-94
chk-12162   chk-17462   chk-20746   chk-25538   chk-26337   chk-26407   chk-29581   chk-33111   chk-38398   chk-41668   chk-7116    chk-95

其中只有chk-41668、chk-41667、chk-41666有数据。其余目录为空。

这是预期的行为。如何删除那些空目录?是否有删除空目录的配置?

共有1个答案

雍兴修
2023-03-14

在这里回答我自己的问题:

在UI中,我在检查点部分看到“完成前检查点已过期”错误。并发现要解决此错误,我们需要增加检查点超时。

我将超时从60增加到500,它开始删除空的chk-*文件。

env.getCheckpointConfig().setCheckpointTimeout(500);
 类似资料:
  • 在广播模式的文档中,提到没有RocksDB状态后端: 如果应用程序使用rocksdb作为状态后端,这将如何影响保存点行为?这是否意味着状态在保存点期间不存储,因此不会恢复?

  • 我的Flink作业从kafka主题读取并将数据存储在RocksDB状态后端,以利用可查询状态。我能够在本地机器中运行作业并查询状态。但是在集群上部署时,我收到以下错误: 我已经尝试在集群级别和作业级别设置rocksDB状态后端。当它设置为作业级别时,我已将其作为阴影依赖项提供。我也尝试在主机集群机器上编译代码。我在所有情况下都会得到相同的错误。 如何解决此错误?

  • 我在同一份flink jobs中读了两个Kafka主题。 :来自第一个主题的消息被保存到rocksdb,然后它将与Stream2联合。 :来自第二个主题的消息被Stream1保存的状态所丰富,然后它将与Stream1联合。 主题1和主题2是不同的来源,但两个来源的输出基本相同。我必须用topic1的数据来充实topic2的数据。 这里是流动; 这里是问题; 那个流量好吗? 可以访问由保存的相同的状

  • 我对闪身是个新手。我正在尝试在我的应用程序中启用检查点和状态。我从Flink文档中看到了我们是如何存储键控状态的。但是我想知道我们是否可以存储非键控状态(的状态)

  • 我有一个FlinkV1.2的设置,3个JobManager,2个TaskManager。对于后端状态和检查点以及zookeeper storageDir,我想使用S3桶而不是hdfs 我没有安装hadoop。不确定是否需要这样做,以及是否需要这样做,应该如何/在哪里安装/配置它? 编辑:在使用以下hadoop xml(core-site.xml)配置Flink之后,我并不真正理解IAM部分,而且我

  • 我有一个FlinkV1.2的设置,3个JobManager,2个TaskManager。我想将hdfs用于后端状态和检查点以及zookeeper storageDir 在JobManager日志中 Hadoop作为单个节点集群安装在我在Settings中设置的VM上。为什么Flink要求配置额外的参数?(顺便说一句,官方文件中没有这些内容)