当前位置: 首页 > 工具软件 > RocksDB > 使用案例 >

rocksdb的基本操作

单于阳
2023-12-01

基本操作

该库提供持久性键值存储。键和值是任意字节数组。键根据用户指定的比较器功能在键值存储中排序。rocksdb

打开数据库

数据库具有与文件系统目录相对应的名称。数据库的所有内容都存储在此目录中。下面的示例演示如何打开数据库,并在必要时创建数据库:rocksdb

  #include <cassert>
  #include "rocksdb/db.h"

  rocksdb::DB* db;
  rocksdb::Options options;
  options.create_if_missing = true;
  rocksdb::Status status = rocksdb::DB::Open(options, "/tmp/testdb", &db);
  assert(status.ok());
  ...

如果要在数据库已存在时引发错误,请在调用之前添加以下行:rocksdb::DB::Open

  options.error_if_exists = true;

If you are porting code from to , you can convert your object to a object using , which has the same functionality as :leveldbrocksdbleveldb::Optionsrocksdb::Optionsrocksdb::LevelDBOptionsleveldb::Options

  #include "rocksdb/utilities/leveldb_options.h"

  rocksdb::LevelDBOptions leveldb_options;
  leveldb_options.option1 = value1;
  leveldb_options.option2 = value2;
  ...
  rocksdb::Options options = rocksdb::ConvertOptions(leveldb_options);

数据库选项

用户可以选择始终在代码中显式设置选项字段,如上所示。或者,您也可以通过字符串到字符串映射或选项字符串来设置它。

选项字符串和选项映射

某些选项可以在数据库运行时动态更改。例如:
用户通过 Options 类将选项传递给 RocksDB。除了在 Options 类中设置选项外,还有另外两种方法可以设置它:

从选项文件中获取选项类。
从选项字符串中获取它
从字符串映射中获取它
选项字符串
若要从字符串中获取选项,请调用便利函数或使用包含信息的字符串。还有一个特殊的和获取表特定的选项。GetColumnFamilyOptionsFromString()GetDBOptionsFromString()GetBlockBasedTableOptionsFromString()GetPlainTableOptionsFromString()

table_factory=PlainTable;prefix_extractor=rocksdb.CappedPrefix.13;comparator=leveldb.BytewiseComparator;compression_per_level=kBZip2Compression:kBZip2Compression:kBZip2Compression:kNoCompression:kZlibCompression:kBZip2Compression:kSnappyCompression;max_bytes_for_level_base=986;bloom_locality=8016;target_file_size_base=4294976376;memtable_huge_page_size=2557;max_successive_merges=5497;max_sequential_skip_in_iterations=4294971408;arena_block_size=1893;target_file_size_multiplier=35;min_write_buffer_number_to_merge=9;max_write_buffer_number=84;write_buffer_size=1653;max_compaction_bytes=64;max_bytes_for_level_multiplier=60;memtable_factory=SkipListFactory;compression=kNoCompression;bottommost_compression=kDisableCompressionOption;min_partial_merge_operands=7576;level0_stop_writes_trigger=33;num_levels=99;level0_slowdown_writes_trigger=22;level0_file_num_compaction_trigger=14;compaction_filter=urxcqstuwnCompactionFilter;soft_rate_limit=530.615385;soft_pending_compaction_bytes_limit=0;max_write_buffer_number_to_maintain=84;verify_checksums_in_compaction=false;merge_operator=aabcxehazrMergeOperator;memtable_prefix_bloom_size_ratio=0.4642;memtable_insert_with_hint_prefix_extractor=rocksdb.CappedPrefix.13;paranoid_file_checks=true;force_consistency_checks=true;inplace_update_num_locks=7429;optimize_filters_for_hits=false;level_compaction_dynamic_level_bytes=false;inplace_update_support=false;compaction_style=kCompactionStyleFIFO;purge_redundant_kvs_while_flush=true;hard_pending_compaction_bytes_limit=0;disable_auto_compactions=false;report_bg_io_stats=true;compaction_filter_factory=mpudlojcujCompactionFilterFactory;

选项字符串的示例如下所示


rocksdb::Status s;
s = db->SetOptions({{"write_buffer_size", "131072"}});
assert(s.ok());
s = db->SetDBOptions({{"max_background_flushes", "2"}});
assert(s.ok());

RocksDB 会自动将数据库中使用的选项保存在 DB 目录下的 OPTIONS-xxxx 文件中。用户可以通过从这些选项文件中提取选项来选择在数据库重新启动后保留选项值

数据库选项文件

在 RocksDB 4.3 中,我们添加了一系列功能,使管理 RocksDB 选项变得更加容易。

现在,每个 RocksDB 数据库都会在每次成功调用 DB::Open()、SetOptions() 和 CreateColumnFamily() / DropColumnFamily() 时自动将其当前选项集保存到文件中。

LoadLatestOptions() / LoadOptionsFromFile():从选项文件构造 RocksDB 选项对象的函数。

CheckOptionsCompatibility :对两组 RocksDB 选项执行兼容性检查的函数。

借助上述选项文件支持,开发人员不再需要维护以前创建的 RocksDB 实例的完整选项集。此外,当需要更改选项时,CheckOptionsCompatibility () 可以进一步确保生成的选项集可以成功打开相同的 RocksDB 数据库,而不会损坏底层数据

s = DB::Open(rocksdb_options, path_to_db, &db);
...
// Create column family, and rocksdb will persist the options.
ColumnFamilyHandle* cf;
s = db->CreateColumnFamily(ColumnFamilyOptions(), "new_cf", &cf);
...
// close DB
delete cf;
delete db;

首先,我们调用 LoadLatestOptions() 来加载目标 RocksDB 数据库使用的最新选项集:

ConfigOptions cfg_opts;
DBOptions loaded_db_opt;
std::vector<ColumnFamilyDescriptor> loaded_cf_descs;
LoadLatestOptions(cfg_opts, path_to_db, &loaded_db_opt, &loaded_cf_descs);

完整的数据库配置格式如下

[Version]
  rocksdb_version=4.3.0
  options_file_version=1.1

[DBOptions]
  stats_dump_period_sec=600
  max_manifest_file_size=18446744073709551615
  bytes_per_sync=8388608
  delayed_write_rate=2097152
  WAL_ttl_seconds=0
  WAL_size_limit_MB=0
  max_subcompactions=1
  wal_dir=
  wal_bytes_per_sync=0
  db_write_buffer_size=0
  keep_log_file_num=1000
  table_cache_numshardbits=4
  max_file_opening_threads=1
  writable_file_max_buffer_size=1048576
  random_access_max_buffer_size=1048576
  use_fsync=false
  max_total_wal_size=0
  max_open_files=-1
  skip_stats_update_on_db_open=false
  max_background_compactions=16
  manifest_preallocation_size=4194304
  max_background_flushes=7
  is_fd_close_on_exec=true
  max_log_file_size=0
  advise_random_on_open=true
  create_missing_column_families=false
  paranoid_checks=true
  delete_obsolete_files_period_micros=21600000000
  log_file_time_to_roll=0
  compaction_readahead_size=0
  create_if_missing=false
  use_adaptive_mutex=false
  enable_thread_tracking=false
  allow_fallocate=true
  error_if_exists=false
  recycle_log_file_num=0
  db_log_dir=
  skip_log_error_on_recovery=false
  new_table_reader_for_compaction_inputs=true
  allow_mmap_reads=false
  allow_mmap_writes=false
  use_direct_reads=false
  use_direct_writes=false


[CFOptions "default"]
  compaction_style=kCompactionStyleLevel
  compaction_filter=nullptr
  num_levels=6
  table_factory=BlockBasedTable
  comparator=leveldb.BytewiseComparator
  max_sequential_skip_in_iterations=8
  max_bytes_for_level_base=1073741824
  memtable_prefix_bloom_probes=6
  memtable_prefix_bloom_bits=0
  memtable_prefix_bloom_huge_page_tlb_size=0
  max_successive_merges=0
  arena_block_size=16777216
  min_write_buffer_number_to_merge=1
  target_file_size_multiplier=1
  source_compaction_factor=1
  max_bytes_for_level_multiplier=8
  max_bytes_for_level_multiplier_additional=2:3:5
  compaction_filter_factory=nullptr
  max_write_buffer_number=8
  level0_stop_writes_trigger=20
  compression=kSnappyCompression
  level0_file_num_compaction_trigger=4
  purge_redundant_kvs_while_flush=true
  max_write_buffer_size_to_maintain=0
  memtable_factory=SkipListFactory
  max_grandparent_overlap_factor=8
  expanded_compaction_factor=25
  hard_pending_compaction_bytes_limit=137438953472
  inplace_update_num_locks=10000
  level_compaction_dynamic_level_bytes=true
  level0_slowdown_writes_trigger=12
  filter_deletes=false
  verify_checksums_in_compaction=true
  min_partial_merge_operands=2
  paranoid_file_checks=false
  target_file_size_base=134217728
  optimize_filters_for_hits=false
  merge_operator=PutOperator
  compression_per_level=kNoCompression:kNoCompression:kNoCompression:kSnappyCompression:kSnappyCompression:kSnappyCompression
  compaction_measure_io_stats=false
  prefix_extractor=nullptr
  bloom_locality=0
  write_buffer_size=134217728
  disable_auto_compactions=false
  inplace_update_support=false

[TableOptions/BlockBasedTable "default"]
  format_version=2
  whole_key_filtering=true
  no_block_cache=false
  checksum=kCRC32c
  filter_policy=rocksdb.BuiltinBloomFilter
  block_size_deviation=10
  block_size=8192
  block_restart_interval=16
  cache_index_and_filter_blocks=false
  pin_l0_filter_and_index_blocks_in_cache=false
  pin_top_level_index_and_filter=false
  index_type=kBinarySearch
  flush_block_policy_factory=FlushBlockBySizePolicyFactory

status

此类型的值由大多数函数返回,因为可能会遇到错误。您可以检查这样的结果是否正常,还可以打印相关的错误消息:

rocksdb::Statusrocksdb

   rocksdb::Status s = ...;
   if (!s.ok()) cerr << s.ToString() << endl;

关闭数据库

完成数据库后,有两种方法可以正常关闭数据库 -

只需删除数据库对象即可。这将释放数据库打开时保留的所有资源。但是,如果在释放任何资源时遇到任何错误,例如关闭info_log文件时出错,它将丢失。
调用 ,然后删除数据库对象。返回,可以检查以确定是否有任何错误。无论错误如何,都将释放所有资源并且是不可逆的。DB::Close()DB::Close()StatusDB::Close()

  ... open the db as described above ...
  ... do something with db ...
  delete db;

或者

  ... open the db as described above ...
  ... do something with db ...
  Status s = db->Close();
  ... log status ...
  delete db;

数据库提供 、 和方法来修改/查询数据库。例如,下面的代码将存储在 key1 下的值移动到 key2。PutDeleteGetMultiGet

  std::string value;
  rocksdb::Status s = db->Get(rocksdb::ReadOptions(), key1, &value);
  if (s.ok()) s = db->Put(rocksdb::WriteOptions(), key2, value);
  if (s.ok()) s = db->Delete(rocksdb::WriteOptions(), key1);

现在,值大小必须小于 4GB。 RocksDB还允许单一删除,这在某些特殊情况下很有用。

每个结果至少会产生一个从源到值字符串的内存。如果源位于块缓存中,则可以使用 PinnableSlice 来避免额外的副本。Get

  PinnableSlice pinnable_val;
  rocksdb::Status s = db->Get(rocksdb::ReadOptions(), key1, &pinnable_val);

一旦pinnable_val被销毁或在其上调用 ::Reset ,源将被释放。
当从数据库中读取多个密钥时,可以使用。有两种变体:1.以更高性能的方式从单个列系列中读取多个键,即它可以比循环调用更快,以及2.跨多个列系列读取彼此一致的键。MultiGetMultiGetGet

  std::vector<Slice> keys;
  std::vector<PinnableSlice> values;
  std::vector<Status> statuses;

  for ... {
    keys.emplace_back(key);
  }
  values.resize(keys.size());
  statuses.resize(keys.size());

  db->MultiGet(ReadOptions(), cf, keys.size(), keys.data(), values.data(), statuses.data());

为了避免内存分配的开销, 和更高版本的类型可以是堆栈上的类型,也可以是提供连续存储的任何其他类型。keysvaluesstatusesstd::array

  std::vector<ColumnFamilyHandle*> column_families;
  std::vector<Slice> keys;
  std::vector<std::string> values;

  for ... {
    keys.emplace_back(key);
    column_families.emplace_back(column_family);
  }
  values.resize(keys.size());

  std::vector<Status> statuses = db->MultiGet(ReadOptions(), column_families, keys, &values);

原子更新
请注意,如果进程在 Put of key2 之后但在删除 key1 之前死亡,则相同的值可能会存储在多个键下。通过使用类以原子方式应用一组更新,可以避免此类问题:WriteBatch

  #include "rocksdb/write_batch.h"
  ...
  std::string value;
  rocksdb::Status s = db->Get(rocksdb::ReadOptions(), key1, &value);
  if (s.ok()) {
    rocksdb::WriteBatch batch;
    batch.Delete(key1);
    batch.Put(key2, value);
    s = db->Write(rocksdb::WriteOptions(), &batch);
  }

保存要对数据库进行的一系列编辑,批处理中的这些编辑将按顺序应用。请注意,我们之前调用过,以便 if 与 相同,我们最终不会错误地完全删除该值。WriteBatchDeletePutkey1key2

除了原子性优势外,还可以通过将大量单个突变放入 同一批。WriteBatch

同步写入

默认情况下,每个写入都是异步的:它将写入从进程推送到操作系统后返回。从操作系统内存到基础持久存储的传输以异步方式进行。可以为特定写入打开该标志,以使写入操作在写入的数据一直推送到持久存储之前不会返回。(在 Posix 系统上,这是通过在写入操作返回之前调用 or 或 来实现的。rocksdbsyncfsync(…)fdatasync(…)msync(…, MS_SYNC)

  rocksdb::WriteOptions write_options;
  write_options.sync = true;
  db->Put(write_options, ...);

非同步写入

对于非同步写入,RocksDB 仅在操作系统缓冲区或内部缓冲区中缓冲 WAL 写入(当 options.manual_wal_flush = true 时)。它们通常比同步写入快得多。非同步写入的缺点是计算机崩溃可能会导致最后几个更新丢失。请注意,仅写入进程的崩溃(即,不是重新启动)不会造成任何损失,因为即使为 false,更新也会在被视为完成之前从进程内存推送到操作系统。sync

通常可以安全地使用非同步写入。例如,将大量数据加载到数据库中时,可以通过在崩溃后重新启动批量加载来处理丢失的更新。混合方案也是可能的,其中由单独的线程调用。DB::SyncWAL()

我们还提供了一种完全禁用特定写入的预写日志的方法。如果设置为 true,则写入操作根本不会转到日志,并且可能会在进程崩溃时丢失。write_options.disableWAL

默认情况下,RocksDB 用于同步文件,在某些情况下可能比 fsync() 更快。如果要使用 fsync(),可以设置为 true。您应该在像 ext3 这样的文件系统上将其设置为 true,这些文件系统可能会在重新启动后丢失文件。fdatasync()Options::use_fsync

并发

一个数据库一次只能由一个进程打开。该实现从操作系统获取锁以防止滥用。在单个进程中,同一对象可以由多个并发线程安全地共享。即,不同的线程可以在没有任何外部同步的情况下写入或获取迭代器或调用同一个数据库(rocksdb 实现将自动执行所需的同步)。但是,其他对象(如 Iterator 和 WriteBatch)可能需要外部同步。如果两个线程共享此类对象,则必须使用自己的锁定协议保护对该对象的访问。公共头文件中提供了更多详细信息。rocksdbrocksdb::DBGet

迭 代

下面的示例演示如何打印数据库中的所有(键、值)对。

  rocksdb::Iterator* it = db->NewIterator(rocksdb::ReadOptions());
  for (it->SeekToFirst(); it->Valid(); it->Next()) {
    cout << it->key().ToString() << ": " << it->value().ToString() << endl;
  }
  assert(it->status().ok()); // Check for any errors found during the scan
  delete it;

以下变体显示了如何仅处理 范围:[start, limit)

  for (it->Seek(start);
       it->Valid() && it->key().ToString() < limit;
       it->Next()) {
    ...
  }
  assert(it->status().ok()); // Check for any errors found during the scan

您还可以按相反的顺序处理条目。(警告:反向 迭代可能比正向迭代慢一些。

  for (it->SeekToLast(); it->Valid(); it->Prev()) {
    ...
  }
  assert(it->status().ok()); // Check for any errors found during the scan

下面是从一个特定键以相反顺序处理范围(限制、开始)中的条目的示例:

  for (it->SeekForPrev(start);
       it->Valid() && it->key().ToString() > limit;
       it->Prev()) {
    ...
  }
  assert(it->status().ok()); // Check for any errors found during the scan

快照

快照为键值存储的整个状态提供一致的只读视图。 可能为非 NULL,以指示读取应对特定版本的数据库状态进行操作。ReadOptions::snapshot

如果为 NULL,则读取将对当前状态的隐式快照进行操作。ReadOptions::snapshot

快照由 DB::GetSnapshot() 方法创建:

  rocksdb::ReadOptions options;
  options.snapshot = db->GetSnapshot();
  ... apply some updates to db ...
  rocksdb::Iterator* iter = db->NewIterator(options);
  ... read using iter to view the state when the snapshot was created ...
  delete iter;
  db->ReleaseSnapshot(options.snapshot);

请注意,当不再需要快照时,应使用 DB::ReleaseSnapshot 接口释放快照。这允许实现摆脱仅为了支持从该快照开始读取而维护的状态。

slice

上面的 and 调用的返回值是该类型的实例。 是一个简单的结构,包含一个长度和一个指向外部字节数组的指针。返回 a 是返回 a 的更便宜的替代方法,因为我们不需要复制潜在的大键和值。此外,方法不返回以 null 结尾的 C 样式字符串,因为允许键和值包含“\0”字节。it->key()it->value()rocksdb::SliceSliceSlicestd::stringrocksdbrocksdb

C++字符串和以 null 结尾的 C 样式字符串可以轻松转换为切片:

   rocksdb::Slice s1 = "hello";

   std::string str("world");
   rocksdb::Slice s2 = str;

切片可以很容易地转换回C++字符串:

   std::string str = s1.ToString();
   assert(str == std::string("hello"));

使用切片时要小心,因为由调用方来确保切片指向的外部字节数组在使用切片时保持活动状态。例如,以下内容是错误的:

   rocksdb::Slice slice;
   if (...) {
     std::string str = ...;
     slice = str;
   }
   Use(slice);

当语句超出范围时,将被销毁,

事务提交

RocksDB 现在支持多操作事务,

事务数据库:

使用 TransactionDB时,RocksDB 在内部锁定写入的所有密钥以执行冲突检测。如果无法锁定密钥,该操作将返回错误。提交事务时,只要能够写入数据库,就可以保证事务成功。

TransactionDB* txn_db;
Status s = TransactionDB::Open(options, txndb_options, path, &txn_db);

Transaction* txn = txn_db->BeginTransaction(write_options, txn_options);
s = txn->Put(“key”, “value”);
s = txn->Delete(“key2”);
s = txn->Merge(“key3”, “value”);
s = txn->Commit();
delete txn;

乐观事务数据库

DB* db;
OptimisticTransactionDB* txn_db;

Status s = OptimisticTransactionDB::Open(options, path, &txn_db);
db = txn_db->GetBaseDB();

OptimisticTransaction* txn = txn_db->BeginTransaction(write_options, txn_options);
txn->Put(“key”, “value”);
txn->Delete(“key2”);
txn->Merge(“key3”, “value”);
s = txn->Commit();
delete txn;

从事务中读取

db->Put(write_options, “a”, “old”);
db->Put(write_options, “b”, “old”);
txn->Put(“a”, “new”);

vector<string> values;
vector<Status> results = txn->MultiGet(read_options, {“a”, “b”}, &values);
//  The value returned for key “a” will be “new” since it was written by this transaction.
//  The value returned for key “b” will be “old” since it is unchanged in this transaction.

缓存

数据库的内容存储在文件系统中的一组文件中,每个文件存储一系列压缩块。如果为 非 NULL,则用于缓存常用的未压缩块内容。我们使用操作系统文件缓存来缓存压缩的原始数据。因此,文件缓存充当压缩数据的缓存。options.block_cache

  #include "rocksdb/cache.h"
  rocksdb::BlockBasedTableOptions table_options;
  table_options.block_cache = rocksdb::NewLRUCache(100 * 1048576); // 100MB uncompressed cache

  rocksdb::Options options;
  options.table_factory.reset(rocksdb::NewBlockBasedTableFactory(table_options));
  rocksdb::DB* db;
  rocksdb::DB::Open(options, name, &db);
  ... use the db ...
  delete db

执行批量读取时,应用程序可能希望禁用缓存,以便批量读取处理的数据最终不会取代大多数缓存内容。每个迭代器选项可用于实现此目的:

  rocksdb::ReadOptions options;
  options.fill_cache = false;
  rocksdb::Iterator* it = db->NewIterator(options);
  for (it->SeekToFirst(); it->Valid(); it->Next()) {
    ...
  }

校验

rocksdb将校验和与其存储在文件系统中的所有数据相关联。对于验证这些校验和的主动程度,提供了两个单独的控件:

ReadOptions::verify_checksums强制对代表特定读取从文件系统读取的所有数据进行校验和验证。默认情况下处于启用状态。
Options::paranoid_checks可以在打开数据库之前设置为 true,以使数据库实现在检测到内部损坏时立即引发错误。根据数据库的哪个部分已损坏,打开数据库时可能会引发错误,或者稍后由另一个数据库操作引发错误。默认情况下,偏执检查处于打开状态。
校验和验证也可以通过调用 手动触发。此 API 遍历所有列族的所有级别中的所有 SST 文件,并针对每个 SST 文件验证元数据和数据块中嵌入的校验和。目前仅支持BlockBasedTable格式。这些文件是串行验证的,因此 API 调用可能需要很长时间才能完成。此 API 可用于主动验证分布式系统中的数据完整性,例如,如果发现数据库已损坏,则可以创建新的副本。DB::VerifyChecksum()

如果数据库已损坏(也许在打开偏执检查时无法打开),则可以使用该函数来恢复尽可能多的数据。rocksdb::RepairDB

清除 WAL 文件

默认情况下,当旧的预写日志超出范围并且应用程序不再需要它们时,会自动删除它们。有一些选项使用户能够存档日志,然后以 TTL 方式或基于大小限制延迟删除它们。

选项包括 和 。以下是它们的使用方法:Options::WAL_ttl_secondsOptions::WAL_size_limit_MB

如果两者都设置为 0,日志将尽快删除,并且永远不会进入存档。

如果为 0 且 WAL_size_limit_MB 不是 0,则将每 10 分钟检查一次 WAL 文件,如果总大小更大,则从最早开始删除它们,直到满足size_limit。所有空文件都将被删除。WAL_ttl_secondsWAL_size_limit_MB

如果不是 0 并且 WAL_size_limit_MB 是 0,则将每检查一次 WAL 文件,并且将删除早于 WAL_ttl_seconds 的文件。WAL_ttl_secondsWAL_ttl_seconds / 2

如果两者都不为 0,则将每 10 分钟检查一次 WAL 文件,并且两个检查都将以 ttl 为第一个执行。

 类似资料: