KeyDB是Redis的多线程版本,在官网上QPS号称是Redis的5x以上,当然这是不限制CPU核数的情况下,Redis的单线程模型使得对于CPU的使用能力有限,KeyDB通过多线程的方式,尽可能的发挥多核CPU的潜力,提升系统的吞吐。
在Redis中,由主线程处理客户端的连接和请求,同时在主线程中做一些后台任务,比如:过期键、内存淘汰等,如果是集群模式,还用来处理gossip消息。另外还有3个异步线程,用来删除键值对后,释放内存、fsync aof文件、关闭文件。
KeyDB将Redis中的任务重新划分,其中处理客户端的连接和请求是提高吞吐的关键,由多个worker线程处理,用一个主线程处理后台任务和集群模式下的gossip消息。
struct redisServerThreadVars {
aeEventLoop *el = nullptr;
socketFds ipfd; /* TCP socket file descriptors */
socketFds tlsfd; /* TLS socket file descriptors */
int in_eval; /* Are we inside EVAL? */
int in_exec; /* Are we inside EXEC? */
std::vector<client *>
clients_pending_write; /* There is to write or install handler. */
list *unblocked_clients; /* list of clients to unblock before next loop NOT
THREADSAFE */
list *clients_pending_asyncwrite;
int cclients;
int cclientsReplica = 0;
client *current_client; /* Current client */
long fixed_time_expire = 0; /* If > 0, expire keys against server.mstime. */
client *lua_client = nullptr; /* The "fake client" to query Redis from Lua */
struct fastlock lockPendingWrite {
"thread pending write"
};
char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */
long unsigned commandsExecuted = 0;
GarbageCollectorCollection::Epoch gcEpoch;
const redisDbPersistentDataSnapshot **rgdbSnapshot = nullptr;
long long stat_total_error_replies; /* Total number of issued error replies (
command + rejected errors ) */
long long
prev_err_count; /* per thread marker of exisiting errors during a call */
bool fRetrySetAofEvent = false;
bool modulesEnabledThisAeLoop =
false; /* In this loop of aeMain, were modules enabled before
the thread went to sleep? */
bool disable_async_commands =
false; /* this is only valid for one cycle of the AE loop and is reset in
afterSleep */
std::vector<client *> vecclientsProcess;
dictAsyncRehashCtl *rehashCtl = nullptr;
int getRdbKeySaveDelay();
private:
int rdb_key_save_delay = -1; // thread local cache
};
可以看出是把Redis中与处理客户端请求的部分拿出来,再加上一些KeyDB自己的特性。
在KeyDB启动时,根据配置文件创建指定的worker线程数,这些线程同时负责监听客户端的连接,读取客户端发送的请求并处理。
int main(int argc, char** argv) {
...
// 初始化工作线程
for (int iel = 0; iel < cserver.cthreads; ++iel) {
initServerThread(g_pserver->rgthreadvar + iel, iel == IDX_EVENT_LOOP_MAIN);
}
...
// 设置worker线程的工作函数,并设置CPU亲和性
for (int iel = 0; iel < cserver.cthreads; ++iel) {
pthread_create(g_pserver->rgthread + iel, &tattr, workerThreadMain,
(void *)((int64_t)iel));
if (cserver.fThreadAffinity) {
#ifdef __linux__
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(iel + cserver.threadAffinityOffset, &cpuset);
if (pthread_setaffinity_np(g_pserver->rgthread[iel], sizeof(cpu_set_t),
&cpuset) == 0) {
serverLog(LL_NOTICE, "Binding thread %d to cpu %d", iel,
iel + cserver.threadAffinityOffset + 1);
}
#else
serverLog(LL_WARNING, "CPU pinning not available on this platform");
#endif
}
}
// 给redisServerThreadVars设置变量
static void initServerThread(struct redisServerThreadVars *pvar, int fMain) {
pvar->unblocked_clients = listCreate();
pvar->clients_pending_asyncwrite = listCreate();
pvar->ipfd.count = 0;
pvar->tlsfd.count = 0;
pvar->cclients = 0;
pvar->in_eval = 0;
pvar->in_exec = 0;
// 每个工作线程创建一个eventloop
pvar->el = aeCreateEventLoop(g_pserver->maxclients + CONFIG_FDSET_INCR);
pvar->current_client = nullptr;
pvar->fRetrySetAofEvent = false;
if (pvar->el == NULL) {
serverLog(LL_WARNING, "Failed creating the event loop. Error message: '%s'",
strerror(errno));
exit(1);
}
aeSetBeforeSleepProc(pvar->el, beforeSleep, AE_SLEEP_THREADSAFE);
aeSetAfterSleepProc(pvar->el, afterSleep, AE_SLEEP_THREADSAFE);
// 初始化线程锁
fastlock_init(&pvar->lockPendingWrite, "lockPendingWrite");
// 非主线程,初始化周期性函数
if (!fMain) {
if (aeCreateTimeEvent(pvar->el, 1, serverCronLite, NULL, NULL) == AE_ERR) {
serverPanic("Can't create event loop timers.");
exit(1);
}
}
/* Register a readable event for the pipe used to awake the event loop
* when a blocked client in a module needs attention. */
if (aeCreateFileEvent(pvar->el, g_pserver->module_blocked_pipe[0],
AE_READABLE, moduleBlockedClientPipeReadable,
NULL) == AE_ERR) {
serverPanic("Error registering the readable event for the module "
"blocked clients subsystem.");
}
}
重点看下线程工作函数workerThreadMain,主要做了两件事:初始化客户端连接、开启事件循环
void *workerThreadMain(void *parg) {
int iel = (int)((int64_t)parg);
serverLog(LL_NOTICE, "Thread %d alive.", iel);
// 设置线程参数
serverTL = g_pserver->rgthreadvar + iel; // set the TLS threadsafe global
tlsInitThread();
if (iel != IDX_EVENT_LOOP_MAIN) {
// 初始化工作线程网络连接
aeThreadOnline();
aeAcquireLock();
initNetworkingThread(iel, cserver.cthreads > 1);
aeReleaseLock();
aeThreadOffline();
}
moduleAcquireGIL(true); // Normally afterSleep acquires this, but that won't
// be called on the first run
aeThreadOnline();
// 开始事件循环
aeEventLoop *el = g_pserver->rgthreadvar[iel].el;
try {
aeMain(el);
} catch (ShutdownException) {
}
aeThreadOffline();
moduleReleaseGIL(true);
serverAssert(!GlobalLocksAcquired());
aeDeleteEventLoop(el);
tlsCleanupThread();
return NULL;
}
事件循环和Redis中差不多,在初始化线程结构体时,已经设置好了事件函数。
初始化worker线程中,会对客户端的连接做accept处理,然后创建Connection用来处理客户端后续请求:
static void initNetworkingThread(int iel, int fReusePort) {
/* Open the TCP listening socket for the user commands. */
// listen
if (fReusePort || (iel == IDX_EVENT_LOOP_MAIN)) {
if (g_pserver->port != 0 &&
listenToPort(g_pserver->port, &g_pserver->rgthreadvar[iel].ipfd,
fReusePort, (iel == IDX_EVENT_LOOP_MAIN)) == C_ERR) {
serverLog(LL_WARNING, "Failed listening on port %u (TCP), aborting.",
g_pserver->port);
exit(1);
}
if (g_pserver->tls_port != 0 &&
listenToPort(g_pserver->tls_port, &g_pserver->rgthreadvar[iel].tlsfd,
fReusePort, (iel == IDX_EVENT_LOOP_MAIN)) == C_ERR) {
serverLog(LL_WARNING, "Failed listening on port %u (TLS), aborting.",
g_pserver->port);
exit(1);
}
} else {
// We use the main threads file descriptors
memcpy(&g_pserver->rgthreadvar[iel].ipfd,
&g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].ipfd,
sizeof(socketFds));
g_pserver->rgthreadvar[iel].ipfd.count =
g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].ipfd.count;
}
/* Create an event handler for accepting new connections in TCP */
// 每个线程创建连接处理函数
for (int j = 0; j < g_pserver->rgthreadvar[iel].ipfd.count; j++) {
if (aeCreateFileEvent(g_pserver->rgthreadvar[iel].el,
g_pserver->rgthreadvar[iel].ipfd.fd[j],
AE_READABLE | AE_READ_THREADSAFE, acceptTcpHandler,
NULL) == AE_ERR) {
serverPanic("Unrecoverable error creating g_pserver->ipfd file event.");
}
}
...
}
// 用户连接处理函数
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
char cip[NET_IP_STR_LEN];
UNUSED(mask);
UNUSED(privdata);
UNUSED(el);
while(max--) {
cfd = anetTcpAccept(serverTL->neterr, fd, cip, sizeof(cip), &cport);
if (cfd == ANET_ERR) {
if (errno != EWOULDBLOCK)
serverLog(LL_WARNING,
"Accepting client connection: %s", serverTL->neterr);
return;
}
anetCloexec(cfd);
serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
acceptOnThread(connCreateAcceptedSocket(cfd), 0, cip);
}
}
最后通过acceptOnThread将accept到的用户连接指定给某个worker线程,这里的考虑主要是尽可能的均匀给每个worker线程分配用户连接,同时考虑下特殊的用户连接,比如:主客户端。
int chooseBestThreadForAccept()
{
int ielMinLoad = 0;
int cclientsMin = INT_MAX;
for (int iel = 0; iel < cserver.cthreads; ++iel)
{
// 选择accept个数最小的连接
int cclientsThread;
atomicGet(g_pserver->rgthreadvar[iel].cclients, cclientsThread);
cclientsThread += rgacceptsInFlight[iel].load(std::memory_order_relaxed);
// Note: Its repl factor less one because cclients also includes replicas, so we don't want to double count
// 考虑主从复制的扇出
cclientsThread += (g_pserver->rgthreadvar[iel].cclientsReplica) * (g_pserver->replicaIsolationFactor-1);
if (cclientsThread < cserver.thread_min_client_threshold)
return iel;
if (cclientsThread < cclientsMin)
{
cclientsMin = cclientsThread;
ielMinLoad = iel;
}
}
return ielMinLoad;
}
最后与Redis类似,调用acceptCommonHandler创建Client,并在在client中设置读取用户请求读取函数:
static void acceptCommonHandler(connection *conn, int flags, char *ip, int iel) {
client *c;
char conninfo[100];
UNUSED(ip);
AeLocker locker;
locker.arm(nullptr);
...
/* Limit the number of connections we take at the same time.
*
* Admission control will happen before a client is created and connAccept()
* called, because we don't want to even start transport-level negotiation
* if rejected. */
if (listLength(g_pserver->clients) + getClusterConnectionsCount()
>= g_pserver->maxclients)
{
const char *err;
if (g_pserver->cluster_enabled)
err = "-ERR max number of clients + cluster "
"connections reached\r\n";
else
err = "-ERR max number of clients reached\r\n";
/* That's a best effort error message, don't check write errors.
* Note that for TLS connections, no handshake was done yet so nothing
* is written and the connection will just drop. */
if (connWrite(conn,err,strlen(err)) == -1) {
/* Nothing to do, Just to avoid the warning... */
}
g_pserver->stat_rejected_conn++;
connClose(conn);
return;
}
/* Create connection and client */
// 创建client
if ((c = createClient(conn, iel)) == NULL) {
serverLog(LL_WARNING,
"Error registering fd event for the new client: %s (conn: %s)",
connGetLastError(conn),
connGetInfo(conn, conninfo, sizeof(conninfo)));
connClose(conn); /* May be already closed, just ignore errors */
return;
}
/* Last chance to keep flags */
c->flags |= flags;
...
}
client *createClient(connection *conn, int iel) {
client *c = new client;
serverAssert(conn == nullptr || (iel == (serverTL - g_pserver->rgthreadvar)));
c->iel = iel;
/* passing NULL as conn it is possible to create a non connected client.
* This is useful since all the commands needs to be executed
* in the context of a client. When commands are executed in other
* contexts (for instance a Lua script) we need a non connected client. */
//
if (conn) {
serverAssert(iel == (serverTL - g_pserver->rgthreadvar));
connNonBlock(conn);
connEnableTcpNoDelay(conn);
if (cserver.tcpkeepalive)
connKeepAlive(conn,cserver.tcpkeepalive);
// 设置读取函数
connSetReadHandler(conn, readQueryFromClient, true);
connSetPrivateData(conn, c);
}
selectDb(c,0);
uint64_t client_id;
...
}
可以看出,这个处理函数readQueryFromClient与Redis中一样,剩下这个客户端所有的请求都由这个worker线程去处理,接下来就是读取用户网络数据,解析请求,处理请求,最后给客户端发回响应,最后看看发送响应给客户端的过程,通过addReply函数给用户发送响应,此时,可能是client绑定的worker线程调用,也有可能是非client绑定的线程调用,这样发送响应时分成两种:
void addReply(client *c, robj_roptr obj) {
if (prepareClientToWrite(c) != C_OK) return;
...
}
int prepareClientToWrite(client *c) {
bool fAsync = !FCorrectThread(c);
...
if (!fAsync && (c->flags & CLIENT_SLAVE || !clientHasPendingReplies(c))) clientInstallWriteHandler(c);
if (fAsync && !(c->fPendingAsyncWrite)) clientInstallAsyncWriteHandler(c);
...
}
clientHasPendingReplies的响应在每个worker线程的beforeSleep中通过handleClientsWithPendingWrites处理,clientInstallAsyncWriteHandler会在全局的异步发送列追加该client,然后由对应的worker线程再把要发送的client转发给与之绑定的worker线程。
至此,KeyDB的多线程网络模型,已经对应的用户请求处理流程分析完毕。