当前位置: 首页 > 知识库问答 >
问题:

如何验证在Apache Flink中工作的增量检查点?

齐晟
2023-03-14
    null
    import static org.apache.flink.configuration.ConfigOptions.key;
    
    import java.util.Collection;
    import org.apache.flink.configuration.ConfigOption;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory;
    import org.rocksdb.DBOptions;
    import org.rocksdb.InfoLogLevel;
    
    public class DefaultConfigurableOptionsFactoryWithLog extends DefaultConfigurableOptionsFactory {
        private static final long serialVersionUID = 1L;
    
        private String dbLogDir = "";
    
        @Override
        public DBOptions createDBOptions(DBOptions currentOptions,
                                         Collection<AutoCloseable> handlesToClose) {
            currentOptions = super.createDBOptions(currentOptions, handlesToClose);
    
            currentOptions.setInfoLogLevel(InfoLogLevel.INFO_LEVEL);
            currentOptions.setStatsDumpPeriodSec(60);
            currentOptions.setDbLogDir(dbLogDir);
    
            return currentOptions;
        }
    
        @Override
        public String toString() {
            return this.getClass().toString() + "{" + super.toString() + '}';
        }
    
        /**
         * Set directory where RocksDB writes its info LOG file (empty = data dir, otherwise the
         * data directory's absolute path will be used as the log file prefix).
         */
        public void setDbLogDir(String dbLogDir) {
            this.dbLogDir = dbLogDir;
        }
    
        public static final ConfigOption<String> LOG_DIR =
                key("state.backend.rocksdb.log.dir")
                        .stringType()
                        .noDefaultValue()
                        .withDescription("Location of RocksDB's info LOG file (empty = data dir, otherwise the " +
                                "data directory's absolute path will be used as the log file prefix)");
    
        @Override
        public DefaultConfigurableOptionsFactory configure(Configuration configuration) {
            DefaultConfigurableOptionsFactory optionsFactory =
                    super.configure(configuration);
    
            this.dbLogDir = configuration.getOptional(LOG_DIR).orElse(this.dbLogDir);
    
            return optionsFactory;
        }
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    env.enableCheckpointing(interval);
    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    
    RocksDBStateBackend stateBackend = new RocksDBStateBackend(incrementalCheckpointPath,true);
    
    DefaultConfigurableOptionsFactoryWithLog options = new DefaultConfigurableOptionsFactoryWithLog();
    options.setDbLogDir("file:///mnt/flink/storage/rocksdb/logging/");
    
    env.setStateBackend(stateBackend);
    
    stateBackend.setRocksDBOptions(options);
    state.backend.rocksdb.log.dir: "file:///mnt/flink/storage/rocksdb/logging/"
    state.backend.rocksdb.options-factory: com.myflinkcode.common.config.DefaultConfigurableOptionsFactoryWithLog

我浏览了完整的flink仪表板,但我没有得到任何线索,如何检查是增量检查点正在发生还是完全检查点正在发生。请帮助我如何设置RocksDB的日志记录来了解增量检查点是否正在发生。我在文档中看到RocksDB日志记录会在性能和存储方面造成巨大的成本,这是为了测试目的,之后我将禁用它

共有1个答案

慕容耘豪
2023-03-14

我不确定这些信息是否记录或显示在任何地方,但在您的代码中,您可以使用

stateBackend.isIncrementalCheckpointsEnabled()

以确定您的RocksDB状态后端是否启用了检查点,然后自己记录此信息。

注意,要启用增量检查点(默认情况下是关闭的),您需要配置

state.backend.incremental: true
 类似资料:
  • 我有多个Kafka主题(多租户),我运行同一个作业运行多次基于主题的数量,每个作业消耗来自一个主题的消息。我已将文件系统配置为状态后端。 假设有3个作业正在运行。这里的检查站是如何工作的?这3个作业是否都将检查点信息存储在同一路径中?如果任何作业失败,该作业如何知道从何处恢复检查点信息?我们过去常常在向flink集群提交作业时提供作业名称。这和它有什么关系吗?一般来说,Flink如何区分作业及其检

  • 我正在做一个简单的程序,要求用户输入0-19之间的五个数字。我想在每个数字后面添加一些东西(比如if语句),以确保它在这个范围内。如果不是,程序应该说“请再读指令”,然后system.exit(0)。这段代码是相关的: 如有任何帮助,将不胜感激。

  • 问题内容: 我刚刚使用MySQL查询浏览器创建了一个新表,并注意到在“自动增量列”下有一个勾号。这是如何运作的? 以编程方式添加到数据库时,是否仅添加一个数字,然后数据库会自动将该数字递增? 每次有新用户在我的网站上注册时,我都希望他们的客户ID(仅整数)自动递增,因此我不必尝试随机生成一个唯一的数字。 可以简单地完成吗? 谢谢! 问题答案: 以编程方式添加到数据库时,是否仅添加一个数字,然后数据

  • 我正在尝试使用模式验证响应JSON。即使尝试使用错误的模式验证响应,测试用例也会通过。 JSON响应: JSON模式: 我预计这里会出现错误,因为响应不包含所需的served_imsi值。但考验正在通过。

  • 问题内容: 阅读本书中的 Exception时,我发现了以下语句: 被检查的异常由编译器在编译时检查。 和 编译器不会在编译时检查未经检查的异常。 因此,如果我们也可以说或 位于Checked Exceptions类树之下。如何将java编译器知道 会有 一个例外,没有对 其中 可能 仍然 代码为我的理解里面。 另外,强制捕获Checked异常而不是Unchecked意味着什么呢? 问题答案: J