当前位置: 首页 > 工具软件 > KeyDB > 使用案例 >

KeyDB源码解析三——多版本控制

濮阳繁
2023-12-01

Redis官方推荐的单节点内存大小不超过16G,因为过大的内存在做rdb的时候采用fork系统调用,会导致较大的延时,从而引起系统抖动,根据官方的测试结果,大约10ms~300ms/GB。KeyDB采用多线程来扩展节点多核的能力,对应大节点,也需要解决内存的大小限制,KeyDB采用多版本控制MVCC的方式来避免fork系统调用:https://docs.keydb.dev/docs/mvcc

MVCC

KeyDB在系统中维护数据库的多个版本,修改在最新版本上,其他版本只读,看下MVCC如何替代fork。
多版本数据定义在redisDbPersistentDataSnapshot中:

class redisDbPersistentData {
	// Keyspace
  dict *m_pdict = nullptr;          /* The keyspace for this DB */
  // 标记当前快照中为删除的key
  dict *m_pdictTombstone = nullptr; /* Track deletes when we have a snapshot */
  // 链表方式管理,下一个快照指针,如果不为nullptr,说明当前db存在snapshot
  const redisDbPersistentDataSnapshot *m_pdbSnapshot = nullptr;
  std::unique_ptr<redisDbPersistentDataSnapshot> m_spdbSnapshotHOLDER;
};

在执行BGSAVE命令时:rdbSaveBackground->launchRdbSaveThread,会创建一个新的snapshot,RDB文件生成之后,会释放snapshot:

int launchRdbSaveThread(pthread_t &child, rdbSaveInfo *rsi){
	...
	// rdb之前,先创建snapshot
	for (int idb = 0; idb < cserver.dbnum; ++idb)
            args->rgpdb[idb] = g_pserver->db[idb]->createSnapshot(getMvccTstamp(), false /* fOptional */);
    ...
    if (pthread_create(&child, &tattr, rdbSaveThread, args)) {
       pthread_attr_destroy(&tattr);
        // rdb完成之后,结束这个snapshot
        for (int idb = 0; idb < cserver.dbnum; ++idb)
            g_pserver->db[idb]->endSnapshot(args->rgpdb[idb]);
        args->~rdbSaveThreadArgs();
        zfree(args);
        return C_ERR;
    }
}

创建snapshot,把当前db中的内容设置为snapshot,并插入snapshot链表中,此后这个db就只读了,后续生成新的db,最为当前的db,用来更新

const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint64_t mvccCheckpoint, bool fOptional) {
	...
	if (m_spdbSnapshotHOLDER != nullptr)
    {
        // If possible reuse an existing snapshot (we want to minimize nesting)
        // 查看当前的快照是否足够新,来复用
        if (mvccCheckpoint <= m_spdbSnapshotHOLDER->m_mvccCheckpoint)
        {
            if (!m_spdbSnapshotHOLDER->FStale())
            {
                m_spdbSnapshotHOLDER->m_refCount++;
                return m_spdbSnapshotHOLDER.get();
            }
            serverLog(LL_VERBOSE, "Existing snapshot too old, creating a new one");
        }
    }

    // See if we have too many levels and can bail out of this to reduce load
    // 快照的层级不能太多,防止内存膨胀
    if (fOptional && (levels >= 6))
    {
        serverLog(LL_DEBUG, "Snapshot nesting too deep, abondoning");
        return nullptr;
    }
    ...
    // 快照的dict是只读的,不需要rehash
    discontinueAsyncRehash(m_pdict);
    discontinueAsyncRehash(m_pdictTombstone);

    spdb->m_fAllChanged = false;
    spdb->m_fTrackingChanges = 0;
    // 直接将当前db的dict设置为快照的dict
    spdb->m_pdict = m_pdict;
    spdb->m_pdictTombstone = m_pdictTombstone;
    // Add a fake iterator so the dicts don't rehash (they need to be read only)
    dictPauseRehashing(spdb->m_pdict);
    dictForceRehash(spdb->m_pdictTombstone);    // prevent rehashing by finishing the rehash now
    spdb->m_spdbSnapshotHOLDER = std::move(m_spdbSnapshotHOLDER);
    if (m_spstorage != nullptr)
        spdb->m_spstorage = std::shared_ptr<StorageCache>(const_cast<StorageCache*>(m_spstorage->clone()));
    spdb->m_pdbSnapshot = m_pdbSnapshot;
    spdb->m_refCount = 1;
    spdb->m_mvccCheckpoint = getMvccTstamp();
    ...
    // 生成新的dict
    m_pdict = dictCreate(&dbDictType,this);
    dictExpand(m_pdict, 1024);   // minimize rehash overhead
    m_pdictTombstone = dictCreate(&dbTombstoneDictType, this);

    serverAssert(spdb->m_pdict->pauserehash == 1);

    // 将新创建的快照插入snapshot链表中
    m_spdbSnapshotHOLDER = std::move(spdb);
    m_pdbSnapshot = m_spdbSnapshotHOLDER.get();

    // Finally we need to take a ref on all our children snapshots.  This ensures they aren't free'd before we are
    redisDbPersistentData *pdbSnapshotNext = m_pdbSnapshot->m_spdbSnapshotHOLDER.get();
    while (pdbSnapshotNext != nullptr)
    {
        pdbSnapshotNext->m_refCount++;
        pdbSnapshotNext = pdbSnapshotNext->m_spdbSnapshotHOLDER.get();
    }

    if (m_pdbSnapshotASYNC != nullptr)
    {
        // free the async snapshot, it's done its job
        endSnapshot(m_pdbSnapshotASYNC);    // should be just a dec ref (FAST)
        m_pdbSnapshotASYNC = nullptr;
    }
    ...
}

创建完snapshot之后,就可以通过rdbSave对该snapshot生成RDB文件,核心函数为iterate_threadsafe,首先遍历当前dict,然后遍历链表中的其他snapshot:

bool redisDbPersistentDataSnapshot::iterate_threadsafe(std::function<bool(const char*, robj_roptr o)> fn, bool fKeyOnly, bool fCacheOnly) const
{
    return iterate_threadsafe_core(fn, fKeyOnly, fCacheOnly, true);
}
bool redisDbPersistentDataSnapshot::iterate_threadsafe_core(std::function<bool(const char*, robj_roptr o)> &fn, bool fKeyOnly, bool fCacheOnly, bool fFirst) const {
	...
	// 先遍历当前snatshot
	dictIterator *di = dictGetSafeIterator(m_pdict);
    while(fResult && ((de = dictNext(di)) != nullptr))
    {
        --celem;
        robj *o = (robj*)dictGetVal(de);
        if (!fn((const char*)dictGetKey(de), o))
            fResult = false;
    }
    dictReleaseIterator(di);
    ...
    // 递归遍历链表中剩下的snapshot
    const redisDbPersistentDataSnapshot *psnapshot;
    __atomic_load(&m_pdbSnapshot, &psnapshot, __ATOMIC_ACQUIRE);
    if (fResult && psnapshot != nullptr)
    {
        std::function<bool(const char*, robj_roptr o)> fnNew = [&fn, &celem, dictTombstone](const char *key, robj_roptr o) {
            // 如果key已经被标记删除,则跳过
            dictEntry *deTombstone = dictFind(dictTombstone, key);
            if (deTombstone != nullptr)
                return true;

            // Alright it's a key in the use keyspace, lets ensure it and then pass it off
            --celem;
            return fn(key, o);
        };
        fResult = psnapshot->iterate_threadsafe_core(fnNew, fKeyOnly, fCacheOnly, false);
    }
}

snapshot的CURD

有了snapshot之后,对db的CURD都要考虑snapshot。
新增KV对:

bool redisDbPersistentData::insert(char *key, robj *o, bool fAssumeNew, dict_iter *piterExisting)
{
    if (!fAssumeNew && (g_pserver->m_pstorageFactory != nullptr || m_pdbSnapshot != nullptr))
        ensure(key);
    dictEntry *de;
    // 直接放入当前db的dict中
    int res = dictAdd(m_pdict, key, o, &de);
    serverAssert(FImplies(fAssumeNew, res == DICT_OK));
    if (res == DICT_OK)
    {
#ifdef CHECKED_BUILD
        if (m_pdbSnapshot != nullptr && m_pdbSnapshot->find_cached_threadsafe(key) != nullptr)
        {
            serverAssert(dictFind(m_pdictTombstone, key) != nullptr);
        }
#endif
        trackkey(key, false /* fUpdate */);
    }
    else
    {
        if (piterExisting)
            *piterExisting = dict_iter(m_pdict, de);
    }
    return (res == DICT_OK);
}

查找KV对:

  dict_iter find(const char *key) {
    // 先在本m_pdict查找
    dictEntry *de = dictFind(m_pdict, key);
    // 然后递归在各个snapshot中查找
    ensure(key, &de);
    return dict_iter(m_pdict, de);
  }

ensure是递归查找各个snapshot中是否存在该key,首先查找snapshot中的m_pdictTombstone,如果已经被标记删除了,则不再需要继续查找。
更新KV对:

void genericSetKey(client *c, redisDb *db, robj *key, robj *val, int keepttl, int signal) {
    // 如果准备overwrite,并且当前db中有这个key,先标记key失效
    db->prepOverwriteForSnapshot(szFromObj(key));
    dict_iter iter;
    if (!dbAddCore(db, szFromObj(key), val, true /* fUpdateMvcc */, false /*fAssumeNew*/, &iter)) {
        dbOverwrite(db, key, val, !keepttl, &iter);
    }
    incrRefCount(val);
    if (signal) signalModifiedKey(c,db,key);
}
void redisDbPersistentData::prepOverwriteForSnapshot(char *key)
{
    if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LFU)
        return;

    if (m_pdbSnapshot != nullptr)
    {
    	// 如果snapshot中有,则在m_pdictTombstone插入该key,表示该key需要被删除
        auto itr = m_pdbSnapshot->find_cached_threadsafe(key);
        if (itr.key() != nullptr)
        {
            if (itr.val()->FExpires()) {
                // Note: I'm sure we could handle this, but its too risky at the moment.
                //  There are known bugs doing this with expires
                return;
            }
            sds keyNew = sdsdupshared(itr.key());
            if (dictAdd(m_pdictTombstone, keyNew, (void*)dictHashKey(m_pdict, key)) != DICT_OK)
                sdsfree(keyNew);
        }
    }
}

删除KV对

bool redisDbPersistentData::syncDelete(robj *key)
{
    ...
    if (m_spstorage != nullptr)
        fDeleted = m_spstorage->erase(szFromObj(key));
    // 先在当前m_pdict中删除
    fDeleted = (dictDelete(m_pdict,ptrFromObj(key)) == DICT_OK) || fDeleted;

    if (fDeleted) {
        ...
        if (m_pdbSnapshot != nullptr)
        {
            // 如果有snapshot,那么在snapshot的m_pdictTombstone添加key的删除标记
            auto itr = m_pdbSnapshot->find_cached_threadsafe(szFromObj(key));
            if (itr != nullptr)
            {
                sds keyTombstone = sdsdupshared(itr.key());
                uint64_t hash = dictGetHash(m_pdict, keyTombstone);
                if (dictAdd(m_pdictTombstone, keyTombstone, (void*)hash) != DICT_OK)
                    sdsfree(keyTombstone);
            }
        }
        if (g_pserver->cluster_enabled) slotToKeyDel(szFromObj(key));
        return 1;
    } else {
        return 0;
    }
}

snapshot的GC

当不需要snapshot时,通过把各个snapshot的dict进行merge,在merge之前,删除已经被标记为删除的key:

void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psnapshot)
{
	// 第一层的snapshot已经没有ref了 ,可以释放,那么先递归释放快照链表中的其他链表
    if (m_spdbSnapshotHOLDER->m_refCount == 1)
        recursiveFreeSnapshots(m_spdbSnapshotHOLDER.get());
      // Stage 1 Loop through all the tracked deletes and remove them from the snapshot DB
    dictIterator *di = dictGetIterator(m_pdictTombstone);
    dictEntry *de;
    dictPauseRehashing(m_spdbSnapshotHOLDER->m_pdict);
    auto splazy = std::make_unique<LazyFree>();
    while ((de = dictNext(di)) != NULL)
    {
        dictEntry **dePrev;
        dictht *ht;
        // BUG BUG Why not a shallow search?
        dictEntry *deSnapshot = dictFindWithPrev(m_spdbSnapshotHOLDER->m_pdict, dictGetKey(de), (uint64_t)dictGetVal(de), &dePrev, &ht, false /*!!sdsisshared((sds)dictGetKey(de))*/);
        if (deSnapshot == nullptr && m_spdbSnapshotHOLDER->m_pdbSnapshot)
        {
            // The tombstone is for a grand child, propogate it (or possibly in the storage provider - but an extra tombstone won't hurt)
#ifdef CHECKED_BUILD
            serverAssert(m_spdbSnapshotHOLDER->m_pdbSnapshot->find_cached_threadsafe((const char*)dictGetKey(de)) != nullptr);
#endif
            // 把本snapshot中要删除的key传递到下一层snapshot中
            dictAdd(m_spdbSnapshotHOLDER->m_pdictTombstone, sdsdupshared((sds)dictGetKey(de)), dictGetVal(de));
            continue;
        }
        else if (deSnapshot == nullptr)
        {
            serverAssert(m_spdbSnapshotHOLDER->m_spstorage != nullptr); // the only case where we can have a tombstone without a snapshot child is if a storage engine is set
            continue;
        }
        
        // Delete the object from the source dict, we don't use dictDelete to avoid a second search
        *dePrev = deSnapshot->next; // Unlink it first
        if (deSnapshot != nullptr) {
            if (m_spdbSnapshotHOLDER->m_pdict->asyncdata != nullptr) {
                dictFreeUnlinkedEntry(m_spdbSnapshotHOLDER->m_pdict, deSnapshot);
            } else {
                splazy->vecde.push_back(deSnapshot);
            }
        }
        ht->used--;
    }
    // Stage 2 Move all new keys to the snapshot DB
    // merge snapshot
    dictMerge(m_spdbSnapshotHOLDER->m_pdict, m_pdict);
    
    // Stage 3 swap the databases with the snapshot
    // 移除snapshot
    std::swap(m_pdict, m_spdbSnapshotHOLDER->m_pdict);
    if (m_spdbSnapshotHOLDER->m_pdbSnapshot != nullptr)
        std::swap(m_pdictTombstone, m_spdbSnapshotHOLDER->m_pdictTombstone);
     ...
}

MVCC的其他应用

除了替代fork生成rdb,MVCC在keys、scan命令也有应用,这两个命令都是需要当前DB的快照
keys:

void keysCommand(client *c) {
    sds pattern = szFromObj(c->argv[1]);

    const redisDbPersistentDataSnapshot *snapshot = nullptr;
    // 创建snapshot
    if (!(c->flags & (CLIENT_MULTI | CLIENT_BLOCKED)))
        snapshot = c->db->createSnapshot(c->mvccCheckpoint, true /* fOptional */);
    if (snapshot != nullptr)
    {
        sds patternCopy = sdsdup(pattern);
        aeEventLoop *el = serverTL->el;
        blockClient(c, BLOCKED_ASYNC);
        redisDb *db = c->db;
        // 遍历snapshot
        g_pserver->asyncworkqueue->AddWorkFunction([el, c, db, patternCopy, snapshot]{
            keysCommandCore(c, snapshot, patternCopy);
            sdsfree(patternCopy);
            aePostFunction(el, [c, db, snapshot]{
                aeReleaseLock();    // we need to lock with coordination of the client

                std::unique_lock<decltype(c->lock)> lock(c->lock);
                AeLocker locker;
                locker.arm(c);

                unblockClient(c);

                locker.disarm();
                lock.unlock();
                db->endSnapshotAsync(snapshot);
                aeAcquireLock();
            });
        });
    }
    else
    {
        keysCommandCore(c, c->db, pattern);
    }
}

scan命令通过asyncCommand命令实现:

bool client::asyncCommand(
    std::function<void(const redisDbPersistentDataSnapshot *,
                       const std::vector<robj_sharedptr> &)> &&mainFn,
    std::function<void(const redisDbPersistentDataSnapshot *)> &&postFn) {
  serverAssert(FCorrectThread(this));
  const redisDbPersistentDataSnapshot *snapshot = nullptr;
  if (!(this->flags & (CLIENT_MULTI | CLIENT_BLOCKED)))
    snapshot =
        this->db->createSnapshot(this->mvccCheckpoint, false /* fOptional */);
  if (snapshot == nullptr) {
    return false;
  }
  aeEventLoop *el = serverTL->el;
  blockClient(this, BLOCKED_ASYNC);
  g_pserver->asyncworkqueue->AddWorkFunction(
      [el, this, mainFn, postFn, snapshot] {
        std::vector<robj_sharedptr> args = clientArgs(this);
        aePostFunction(el, [this, mainFn, postFn, snapshot, args] {
          aeReleaseLock();
          std::unique_lock<decltype(this->lock)> lock(this->lock);
          AeLocker locker;
          locker.arm(this);
          unblockClient(this);
          mainFn(snapshot, args);
          locker.disarm();
          lock.unlock();
          if (postFn)
            postFn(snapshot);
          this->db->endSnapshotAsync(snapshot);
          aeAcquireLock();
        });
      });
  return true;
}
 类似资料: