import static org.apache.flink.configuration.ConfigOptions.key;
import java.util.Collection;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory;
import org.rocksdb.DBOptions;
import org.rocksdb.InfoLogLevel;
public class DefaultConfigurableOptionsFactoryWithLog extends DefaultConfigurableOptionsFactory {
private static final long serialVersionUID = 1L;
private String dbLogDir = "";
@Override
public DBOptions createDBOptions(DBOptions currentOptions,
Collection<AutoCloseable> handlesToClose) {
currentOptions = super.createDBOptions(currentOptions, handlesToClose);
currentOptions.setInfoLogLevel(InfoLogLevel.INFO_LEVEL);
currentOptions.setStatsDumpPeriodSec(60);
currentOptions.setDbLogDir(dbLogDir);
return currentOptions;
}
@Override
public String toString() {
return this.getClass().toString() + "{" + super.toString() + '}';
}
/**
* Set directory where RocksDB writes its info LOG file (empty = data dir, otherwise the
* data directory's absolute path will be used as the log file prefix).
*/
public void setDbLogDir(String dbLogDir) {
this.dbLogDir = dbLogDir;
}
public static final ConfigOption<String> LOG_DIR =
key("state.backend.rocksdb.log.dir")
.stringType()
.noDefaultValue()
.withDescription("Location of RocksDB's info LOG file (empty = data dir, otherwise the " +
"data directory's absolute path will be used as the log file prefix)");
@Override
public DefaultConfigurableOptionsFactory configure(Configuration configuration) {
DefaultConfigurableOptionsFactory optionsFactory =
super.configure(configuration);
this.dbLogDir = configuration.getOptional(LOG_DIR).orElse(this.dbLogDir);
return optionsFactory;
}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(interval);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
RocksDBStateBackend stateBackend = new RocksDBStateBackend(incrementalCheckpointPath,true);
DefaultConfigurableOptionsFactoryWithLog options = new DefaultConfigurableOptionsFactoryWithLog();
options.setDbLogDir("file:///mnt/flink/storage/rocksdb/logging/");
env.setStateBackend(stateBackend);
stateBackend.setRocksDBOptions(options);
state.backend.rocksdb.log.dir: "file:///mnt/flink/storage/rocksdb/logging/"
state.backend.rocksdb.options-factory: com.myflinkcode.common.config.DefaultConfigurableOptionsFactoryWithLog
我浏览了完整的flink仪表板,但我没有得到任何线索,如何检查是增量检查点正在发生还是完全检查点正在发生。请帮助我如何设置RocksDB的日志记录来了解增量检查点是否正在发生。我在文档中看到RocksDB日志记录会在性能和存储方面造成巨大的成本,这是为了测试目的,之后我将禁用它
我不确定这些信息是否记录或显示在任何地方,但在您的代码中,您可以使用
stateBackend.isIncrementalCheckpointsEnabled()
以确定您的RocksDB状态后端是否启用了检查点,然后自己记录此信息。
注意,要启用增量检查点(默认情况下是关闭的),您需要配置
state.backend.incremental: true
我有多个Kafka主题(多租户),我运行同一个作业运行多次基于主题的数量,每个作业消耗来自一个主题的消息。我已将文件系统配置为状态后端。 假设有3个作业正在运行。这里的检查站是如何工作的?这3个作业是否都将检查点信息存储在同一路径中?如果任何作业失败,该作业如何知道从何处恢复检查点信息?我们过去常常在向flink集群提交作业时提供作业名称。这和它有什么关系吗?一般来说,Flink如何区分作业及其检
我正在做一个简单的程序,要求用户输入0-19之间的五个数字。我想在每个数字后面添加一些东西(比如if语句),以确保它在这个范围内。如果不是,程序应该说“请再读指令”,然后system.exit(0)。这段代码是相关的: 如有任何帮助,将不胜感激。
问题内容: 我刚刚使用MySQL查询浏览器创建了一个新表,并注意到在“自动增量列”下有一个勾号。这是如何运作的? 以编程方式添加到数据库时,是否仅添加一个数字,然后数据库会自动将该数字递增? 每次有新用户在我的网站上注册时,我都希望他们的客户ID(仅整数)自动递增,因此我不必尝试随机生成一个唯一的数字。 可以简单地完成吗? 谢谢! 问题答案: 以编程方式添加到数据库时,是否仅添加一个数字,然后数据
我正在尝试使用模式验证响应JSON。即使尝试使用错误的模式验证响应,测试用例也会通过。 JSON响应: JSON模式: 我预计这里会出现错误,因为响应不包含所需的served_imsi值。但考验正在通过。
问题内容: 阅读本书中的 Exception时,我发现了以下语句: 被检查的异常由编译器在编译时检查。 和 编译器不会在编译时检查未经检查的异常。 因此,如果我们也可以说或 位于Checked Exceptions类树之下。如何将java编译器知道 会有 一个例外,没有对 其中 可能 仍然 代码为我的理解里面。 另外,强制捕获Checked异常而不是Unchecked意味着什么呢? 问题答案: J