Redis官方推荐的单节点内存大小不超过16G,因为过大的内存在做rdb的时候采用fork系统调用,会导致较大的延时,从而引起系统抖动,根据官方的测试结果,大约10ms~300ms/GB。KeyDB采用多线程来扩展节点多核的能力,对应大节点,也需要解决内存的大小限制,KeyDB采用多版本控制MVCC的方式来避免fork系统调用:https://docs.keydb.dev/docs/mvcc
KeyDB在系统中维护数据库的多个版本,修改在最新版本上,其他版本只读,看下MVCC如何替代fork。
多版本数据定义在redisDbPersistentDataSnapshot中:
class redisDbPersistentData {
// Keyspace
dict *m_pdict = nullptr; /* The keyspace for this DB */
// 标记当前快照中为删除的key
dict *m_pdictTombstone = nullptr; /* Track deletes when we have a snapshot */
// 链表方式管理,下一个快照指针,如果不为nullptr,说明当前db存在snapshot
const redisDbPersistentDataSnapshot *m_pdbSnapshot = nullptr;
std::unique_ptr<redisDbPersistentDataSnapshot> m_spdbSnapshotHOLDER;
};
在执行BGSAVE命令时:rdbSaveBackground->launchRdbSaveThread,会创建一个新的snapshot,RDB文件生成之后,会释放snapshot:
int launchRdbSaveThread(pthread_t &child, rdbSaveInfo *rsi){
...
// rdb之前,先创建snapshot
for (int idb = 0; idb < cserver.dbnum; ++idb)
args->rgpdb[idb] = g_pserver->db[idb]->createSnapshot(getMvccTstamp(), false /* fOptional */);
...
if (pthread_create(&child, &tattr, rdbSaveThread, args)) {
pthread_attr_destroy(&tattr);
// rdb完成之后,结束这个snapshot
for (int idb = 0; idb < cserver.dbnum; ++idb)
g_pserver->db[idb]->endSnapshot(args->rgpdb[idb]);
args->~rdbSaveThreadArgs();
zfree(args);
return C_ERR;
}
}
创建snapshot,把当前db中的内容设置为snapshot,并插入snapshot链表中,此后这个db就只读了,后续生成新的db,最为当前的db,用来更新
const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint64_t mvccCheckpoint, bool fOptional) {
...
if (m_spdbSnapshotHOLDER != nullptr)
{
// If possible reuse an existing snapshot (we want to minimize nesting)
// 查看当前的快照是否足够新,来复用
if (mvccCheckpoint <= m_spdbSnapshotHOLDER->m_mvccCheckpoint)
{
if (!m_spdbSnapshotHOLDER->FStale())
{
m_spdbSnapshotHOLDER->m_refCount++;
return m_spdbSnapshotHOLDER.get();
}
serverLog(LL_VERBOSE, "Existing snapshot too old, creating a new one");
}
}
// See if we have too many levels and can bail out of this to reduce load
// 快照的层级不能太多,防止内存膨胀
if (fOptional && (levels >= 6))
{
serverLog(LL_DEBUG, "Snapshot nesting too deep, abondoning");
return nullptr;
}
...
// 快照的dict是只读的,不需要rehash
discontinueAsyncRehash(m_pdict);
discontinueAsyncRehash(m_pdictTombstone);
spdb->m_fAllChanged = false;
spdb->m_fTrackingChanges = 0;
// 直接将当前db的dict设置为快照的dict
spdb->m_pdict = m_pdict;
spdb->m_pdictTombstone = m_pdictTombstone;
// Add a fake iterator so the dicts don't rehash (they need to be read only)
dictPauseRehashing(spdb->m_pdict);
dictForceRehash(spdb->m_pdictTombstone); // prevent rehashing by finishing the rehash now
spdb->m_spdbSnapshotHOLDER = std::move(m_spdbSnapshotHOLDER);
if (m_spstorage != nullptr)
spdb->m_spstorage = std::shared_ptr<StorageCache>(const_cast<StorageCache*>(m_spstorage->clone()));
spdb->m_pdbSnapshot = m_pdbSnapshot;
spdb->m_refCount = 1;
spdb->m_mvccCheckpoint = getMvccTstamp();
...
// 生成新的dict
m_pdict = dictCreate(&dbDictType,this);
dictExpand(m_pdict, 1024); // minimize rehash overhead
m_pdictTombstone = dictCreate(&dbTombstoneDictType, this);
serverAssert(spdb->m_pdict->pauserehash == 1);
// 将新创建的快照插入snapshot链表中
m_spdbSnapshotHOLDER = std::move(spdb);
m_pdbSnapshot = m_spdbSnapshotHOLDER.get();
// Finally we need to take a ref on all our children snapshots. This ensures they aren't free'd before we are
redisDbPersistentData *pdbSnapshotNext = m_pdbSnapshot->m_spdbSnapshotHOLDER.get();
while (pdbSnapshotNext != nullptr)
{
pdbSnapshotNext->m_refCount++;
pdbSnapshotNext = pdbSnapshotNext->m_spdbSnapshotHOLDER.get();
}
if (m_pdbSnapshotASYNC != nullptr)
{
// free the async snapshot, it's done its job
endSnapshot(m_pdbSnapshotASYNC); // should be just a dec ref (FAST)
m_pdbSnapshotASYNC = nullptr;
}
...
}
创建完snapshot之后,就可以通过rdbSave对该snapshot生成RDB文件,核心函数为iterate_threadsafe,首先遍历当前dict,然后遍历链表中的其他snapshot:
bool redisDbPersistentDataSnapshot::iterate_threadsafe(std::function<bool(const char*, robj_roptr o)> fn, bool fKeyOnly, bool fCacheOnly) const
{
return iterate_threadsafe_core(fn, fKeyOnly, fCacheOnly, true);
}
bool redisDbPersistentDataSnapshot::iterate_threadsafe_core(std::function<bool(const char*, robj_roptr o)> &fn, bool fKeyOnly, bool fCacheOnly, bool fFirst) const {
...
// 先遍历当前snatshot
dictIterator *di = dictGetSafeIterator(m_pdict);
while(fResult && ((de = dictNext(di)) != nullptr))
{
--celem;
robj *o = (robj*)dictGetVal(de);
if (!fn((const char*)dictGetKey(de), o))
fResult = false;
}
dictReleaseIterator(di);
...
// 递归遍历链表中剩下的snapshot
const redisDbPersistentDataSnapshot *psnapshot;
__atomic_load(&m_pdbSnapshot, &psnapshot, __ATOMIC_ACQUIRE);
if (fResult && psnapshot != nullptr)
{
std::function<bool(const char*, robj_roptr o)> fnNew = [&fn, &celem, dictTombstone](const char *key, robj_roptr o) {
// 如果key已经被标记删除,则跳过
dictEntry *deTombstone = dictFind(dictTombstone, key);
if (deTombstone != nullptr)
return true;
// Alright it's a key in the use keyspace, lets ensure it and then pass it off
--celem;
return fn(key, o);
};
fResult = psnapshot->iterate_threadsafe_core(fnNew, fKeyOnly, fCacheOnly, false);
}
}
有了snapshot之后,对db的CURD都要考虑snapshot。
新增KV对:
bool redisDbPersistentData::insert(char *key, robj *o, bool fAssumeNew, dict_iter *piterExisting)
{
if (!fAssumeNew && (g_pserver->m_pstorageFactory != nullptr || m_pdbSnapshot != nullptr))
ensure(key);
dictEntry *de;
// 直接放入当前db的dict中
int res = dictAdd(m_pdict, key, o, &de);
serverAssert(FImplies(fAssumeNew, res == DICT_OK));
if (res == DICT_OK)
{
#ifdef CHECKED_BUILD
if (m_pdbSnapshot != nullptr && m_pdbSnapshot->find_cached_threadsafe(key) != nullptr)
{
serverAssert(dictFind(m_pdictTombstone, key) != nullptr);
}
#endif
trackkey(key, false /* fUpdate */);
}
else
{
if (piterExisting)
*piterExisting = dict_iter(m_pdict, de);
}
return (res == DICT_OK);
}
查找KV对:
dict_iter find(const char *key) {
// 先在本m_pdict查找
dictEntry *de = dictFind(m_pdict, key);
// 然后递归在各个snapshot中查找
ensure(key, &de);
return dict_iter(m_pdict, de);
}
ensure是递归查找各个snapshot中是否存在该key,首先查找snapshot中的m_pdictTombstone,如果已经被标记删除了,则不再需要继续查找。
更新KV对:
void genericSetKey(client *c, redisDb *db, robj *key, robj *val, int keepttl, int signal) {
// 如果准备overwrite,并且当前db中有这个key,先标记key失效
db->prepOverwriteForSnapshot(szFromObj(key));
dict_iter iter;
if (!dbAddCore(db, szFromObj(key), val, true /* fUpdateMvcc */, false /*fAssumeNew*/, &iter)) {
dbOverwrite(db, key, val, !keepttl, &iter);
}
incrRefCount(val);
if (signal) signalModifiedKey(c,db,key);
}
void redisDbPersistentData::prepOverwriteForSnapshot(char *key)
{
if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LFU)
return;
if (m_pdbSnapshot != nullptr)
{
// 如果snapshot中有,则在m_pdictTombstone插入该key,表示该key需要被删除
auto itr = m_pdbSnapshot->find_cached_threadsafe(key);
if (itr.key() != nullptr)
{
if (itr.val()->FExpires()) {
// Note: I'm sure we could handle this, but its too risky at the moment.
// There are known bugs doing this with expires
return;
}
sds keyNew = sdsdupshared(itr.key());
if (dictAdd(m_pdictTombstone, keyNew, (void*)dictHashKey(m_pdict, key)) != DICT_OK)
sdsfree(keyNew);
}
}
}
删除KV对
bool redisDbPersistentData::syncDelete(robj *key)
{
...
if (m_spstorage != nullptr)
fDeleted = m_spstorage->erase(szFromObj(key));
// 先在当前m_pdict中删除
fDeleted = (dictDelete(m_pdict,ptrFromObj(key)) == DICT_OK) || fDeleted;
if (fDeleted) {
...
if (m_pdbSnapshot != nullptr)
{
// 如果有snapshot,那么在snapshot的m_pdictTombstone添加key的删除标记
auto itr = m_pdbSnapshot->find_cached_threadsafe(szFromObj(key));
if (itr != nullptr)
{
sds keyTombstone = sdsdupshared(itr.key());
uint64_t hash = dictGetHash(m_pdict, keyTombstone);
if (dictAdd(m_pdictTombstone, keyTombstone, (void*)hash) != DICT_OK)
sdsfree(keyTombstone);
}
}
if (g_pserver->cluster_enabled) slotToKeyDel(szFromObj(key));
return 1;
} else {
return 0;
}
}
当不需要snapshot时,通过把各个snapshot的dict进行merge,在merge之前,删除已经被标记为删除的key:
void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psnapshot)
{
// 第一层的snapshot已经没有ref了 ,可以释放,那么先递归释放快照链表中的其他链表
if (m_spdbSnapshotHOLDER->m_refCount == 1)
recursiveFreeSnapshots(m_spdbSnapshotHOLDER.get());
// Stage 1 Loop through all the tracked deletes and remove them from the snapshot DB
dictIterator *di = dictGetIterator(m_pdictTombstone);
dictEntry *de;
dictPauseRehashing(m_spdbSnapshotHOLDER->m_pdict);
auto splazy = std::make_unique<LazyFree>();
while ((de = dictNext(di)) != NULL)
{
dictEntry **dePrev;
dictht *ht;
// BUG BUG Why not a shallow search?
dictEntry *deSnapshot = dictFindWithPrev(m_spdbSnapshotHOLDER->m_pdict, dictGetKey(de), (uint64_t)dictGetVal(de), &dePrev, &ht, false /*!!sdsisshared((sds)dictGetKey(de))*/);
if (deSnapshot == nullptr && m_spdbSnapshotHOLDER->m_pdbSnapshot)
{
// The tombstone is for a grand child, propogate it (or possibly in the storage provider - but an extra tombstone won't hurt)
#ifdef CHECKED_BUILD
serverAssert(m_spdbSnapshotHOLDER->m_pdbSnapshot->find_cached_threadsafe((const char*)dictGetKey(de)) != nullptr);
#endif
// 把本snapshot中要删除的key传递到下一层snapshot中
dictAdd(m_spdbSnapshotHOLDER->m_pdictTombstone, sdsdupshared((sds)dictGetKey(de)), dictGetVal(de));
continue;
}
else if (deSnapshot == nullptr)
{
serverAssert(m_spdbSnapshotHOLDER->m_spstorage != nullptr); // the only case where we can have a tombstone without a snapshot child is if a storage engine is set
continue;
}
// Delete the object from the source dict, we don't use dictDelete to avoid a second search
*dePrev = deSnapshot->next; // Unlink it first
if (deSnapshot != nullptr) {
if (m_spdbSnapshotHOLDER->m_pdict->asyncdata != nullptr) {
dictFreeUnlinkedEntry(m_spdbSnapshotHOLDER->m_pdict, deSnapshot);
} else {
splazy->vecde.push_back(deSnapshot);
}
}
ht->used--;
}
// Stage 2 Move all new keys to the snapshot DB
// merge snapshot
dictMerge(m_spdbSnapshotHOLDER->m_pdict, m_pdict);
// Stage 3 swap the databases with the snapshot
// 移除snapshot
std::swap(m_pdict, m_spdbSnapshotHOLDER->m_pdict);
if (m_spdbSnapshotHOLDER->m_pdbSnapshot != nullptr)
std::swap(m_pdictTombstone, m_spdbSnapshotHOLDER->m_pdictTombstone);
...
}
除了替代fork生成rdb,MVCC在keys、scan命令也有应用,这两个命令都是需要当前DB的快照
keys:
void keysCommand(client *c) {
sds pattern = szFromObj(c->argv[1]);
const redisDbPersistentDataSnapshot *snapshot = nullptr;
// 创建snapshot
if (!(c->flags & (CLIENT_MULTI | CLIENT_BLOCKED)))
snapshot = c->db->createSnapshot(c->mvccCheckpoint, true /* fOptional */);
if (snapshot != nullptr)
{
sds patternCopy = sdsdup(pattern);
aeEventLoop *el = serverTL->el;
blockClient(c, BLOCKED_ASYNC);
redisDb *db = c->db;
// 遍历snapshot
g_pserver->asyncworkqueue->AddWorkFunction([el, c, db, patternCopy, snapshot]{
keysCommandCore(c, snapshot, patternCopy);
sdsfree(patternCopy);
aePostFunction(el, [c, db, snapshot]{
aeReleaseLock(); // we need to lock with coordination of the client
std::unique_lock<decltype(c->lock)> lock(c->lock);
AeLocker locker;
locker.arm(c);
unblockClient(c);
locker.disarm();
lock.unlock();
db->endSnapshotAsync(snapshot);
aeAcquireLock();
});
});
}
else
{
keysCommandCore(c, c->db, pattern);
}
}
scan命令通过asyncCommand命令实现:
bool client::asyncCommand(
std::function<void(const redisDbPersistentDataSnapshot *,
const std::vector<robj_sharedptr> &)> &&mainFn,
std::function<void(const redisDbPersistentDataSnapshot *)> &&postFn) {
serverAssert(FCorrectThread(this));
const redisDbPersistentDataSnapshot *snapshot = nullptr;
if (!(this->flags & (CLIENT_MULTI | CLIENT_BLOCKED)))
snapshot =
this->db->createSnapshot(this->mvccCheckpoint, false /* fOptional */);
if (snapshot == nullptr) {
return false;
}
aeEventLoop *el = serverTL->el;
blockClient(this, BLOCKED_ASYNC);
g_pserver->asyncworkqueue->AddWorkFunction(
[el, this, mainFn, postFn, snapshot] {
std::vector<robj_sharedptr> args = clientArgs(this);
aePostFunction(el, [this, mainFn, postFn, snapshot, args] {
aeReleaseLock();
std::unique_lock<decltype(this->lock)> lock(this->lock);
AeLocker locker;
locker.arm(this);
unblockClient(this);
mainFn(snapshot, args);
locker.disarm();
lock.unlock();
if (postFn)
postFn(snapshot);
this->db->endSnapshotAsync(snapshot);
aeAcquireLock();
});
});
return true;
}