当前位置: 首页 > 工具软件 > KeyDB > 使用案例 >

KeyDB源码解析一——网络模型

孙宏壮
2023-12-01

KeyDB是Redis的多线程版本,在官网上QPS号称是Redis的5x以上,当然这是不限制CPU核数的情况下,Redis的单线程模型使得对于CPU的使用能力有限,KeyDB通过多线程的方式,尽可能的发挥多核CPU的潜力,提升系统的吞吐。

线程模型

在Redis中,由主线程处理客户端的连接和请求,同时在主线程中做一些后台任务,比如:过期键、内存淘汰等,如果是集群模式,还用来处理gossip消息。另外还有3个异步线程,用来删除键值对后,释放内存、fsync aof文件、关闭文件。
KeyDB将Redis中的任务重新划分,其中处理客户端的连接和请求是提高吞吐的关键,由多个worker线程处理,用一个主线程处理后台任务和集群模式下的gossip消息。

worker线程定义

struct redisServerThreadVars {
  aeEventLoop *el = nullptr;
  socketFds ipfd;  /* TCP socket file descriptors */
  socketFds tlsfd; /* TLS socket file descriptors */
  int in_eval;     /* Are we inside EVAL? */
  int in_exec;     /* Are we inside EXEC? */
  std::vector<client *>
      clients_pending_write; /* There is to write or install handler. */
  list *unblocked_clients;   /* list of clients to unblock before next loop NOT
                                THREADSAFE */
  list *clients_pending_asyncwrite;
  int cclients;
  int cclientsReplica = 0;
  client *current_client;       /* Current client */
  long fixed_time_expire = 0;   /* If > 0, expire keys against server.mstime. */
  client *lua_client = nullptr; /* The "fake client" to query Redis from Lua */
  struct fastlock lockPendingWrite {
    "thread pending write"
  };
  char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */
  long unsigned commandsExecuted = 0;
  GarbageCollectorCollection::Epoch gcEpoch;
  const redisDbPersistentDataSnapshot **rgdbSnapshot = nullptr;
  long long stat_total_error_replies; /* Total number of issued error replies (
                                         command + rejected errors ) */
  long long
      prev_err_count; /* per thread marker of exisiting errors during a call */
  bool fRetrySetAofEvent = false;
  bool modulesEnabledThisAeLoop =
      false; /* In this loop of aeMain, were modules enabled before
                the thread went to sleep? */
  bool disable_async_commands =
      false; /* this is only valid for one cycle of the AE loop and is reset in
                afterSleep */
  std::vector<client *> vecclientsProcess;
  dictAsyncRehashCtl *rehashCtl = nullptr;

  int getRdbKeySaveDelay();

private:
  int rdb_key_save_delay = -1; // thread local cache
};

可以看出是把Redis中与处理客户端请求的部分拿出来,再加上一些KeyDB自己的特性。

在KeyDB启动时,根据配置文件创建指定的worker线程数,这些线程同时负责监听客户端的连接,读取客户端发送的请求并处理。

int main(int argc, char** argv) {
	...
	// 初始化工作线程
	for (int iel = 0; iel < cserver.cthreads; ++iel) {
	  initServerThread(g_pserver->rgthreadvar + iel, iel == IDX_EVENT_LOOP_MAIN);
	}
	...
	// 设置worker线程的工作函数,并设置CPU亲和性
	for (int iel = 0; iel < cserver.cthreads; ++iel) {
    pthread_create(g_pserver->rgthread + iel, &tattr, workerThreadMain,
                   (void *)((int64_t)iel));
    if (cserver.fThreadAffinity) {
#ifdef __linux__
      cpu_set_t cpuset;
      CPU_ZERO(&cpuset);
      CPU_SET(iel + cserver.threadAffinityOffset, &cpuset);
      if (pthread_setaffinity_np(g_pserver->rgthread[iel], sizeof(cpu_set_t),
                                 &cpuset) == 0) {
        serverLog(LL_NOTICE, "Binding thread %d to cpu %d", iel,
                  iel + cserver.threadAffinityOffset + 1);
      }
#else
      serverLog(LL_WARNING, "CPU pinning not available on this platform");
#endif
    }
}

// 给redisServerThreadVars设置变量
static void initServerThread(struct redisServerThreadVars *pvar, int fMain) {
  pvar->unblocked_clients = listCreate();
  pvar->clients_pending_asyncwrite = listCreate();
  pvar->ipfd.count = 0;
  pvar->tlsfd.count = 0;
  pvar->cclients = 0;
  pvar->in_eval = 0;
  pvar->in_exec = 0;
  // 每个工作线程创建一个eventloop
  pvar->el = aeCreateEventLoop(g_pserver->maxclients + CONFIG_FDSET_INCR);
  pvar->current_client = nullptr;
  pvar->fRetrySetAofEvent = false;
  if (pvar->el == NULL) {
    serverLog(LL_WARNING, "Failed creating the event loop. Error message: '%s'",
              strerror(errno));
    exit(1);
  }
  aeSetBeforeSleepProc(pvar->el, beforeSleep, AE_SLEEP_THREADSAFE);
  aeSetAfterSleepProc(pvar->el, afterSleep, AE_SLEEP_THREADSAFE);

  // 初始化线程锁
  fastlock_init(&pvar->lockPendingWrite, "lockPendingWrite");

  // 非主线程,初始化周期性函数
  if (!fMain) {
    if (aeCreateTimeEvent(pvar->el, 1, serverCronLite, NULL, NULL) == AE_ERR) {
      serverPanic("Can't create event loop timers.");
      exit(1);
    }
  }

  /* Register a readable event for the pipe used to awake the event loop
   * when a blocked client in a module needs attention. */
  if (aeCreateFileEvent(pvar->el, g_pserver->module_blocked_pipe[0],
                        AE_READABLE, moduleBlockedClientPipeReadable,
                        NULL) == AE_ERR) {
    serverPanic("Error registering the readable event for the module "
                "blocked clients subsystem.");
  }
}

重点看下线程工作函数workerThreadMain,主要做了两件事:初始化客户端连接、开启事件循环

void *workerThreadMain(void *parg) {
  int iel = (int)((int64_t)parg);
  serverLog(LL_NOTICE, "Thread %d alive.", iel);
  // 设置线程参数
  serverTL = g_pserver->rgthreadvar + iel; // set the TLS threadsafe global
  tlsInitThread();

  if (iel != IDX_EVENT_LOOP_MAIN) {
    // 初始化工作线程网络连接
    aeThreadOnline();
    aeAcquireLock();
    initNetworkingThread(iel, cserver.cthreads > 1);
    aeReleaseLock();
    aeThreadOffline();
  }

  moduleAcquireGIL(true); // Normally afterSleep acquires this, but that won't
                          // be called on the first run
  aeThreadOnline();
  // 开始事件循环
  aeEventLoop *el = g_pserver->rgthreadvar[iel].el;
  try {
    aeMain(el);
  } catch (ShutdownException) {
  }
  aeThreadOffline();
  moduleReleaseGIL(true);
  serverAssert(!GlobalLocksAcquired());
  aeDeleteEventLoop(el);

  tlsCleanupThread();
  return NULL;
}

事件循环和Redis中差不多,在初始化线程结构体时,已经设置好了事件函数。

客户端与worker线程绑定

初始化worker线程中,会对客户端的连接做accept处理,然后创建Connection用来处理客户端后续请求:

static void initNetworkingThread(int iel, int fReusePort) {
  /* Open the TCP listening socket for the user commands. */
  // listen
  if (fReusePort || (iel == IDX_EVENT_LOOP_MAIN)) {
    if (g_pserver->port != 0 &&
        listenToPort(g_pserver->port, &g_pserver->rgthreadvar[iel].ipfd,
                     fReusePort, (iel == IDX_EVENT_LOOP_MAIN)) == C_ERR) {
      serverLog(LL_WARNING, "Failed listening on port %u (TCP), aborting.",
                g_pserver->port);
      exit(1);
    }
    if (g_pserver->tls_port != 0 &&
        listenToPort(g_pserver->tls_port, &g_pserver->rgthreadvar[iel].tlsfd,
                     fReusePort, (iel == IDX_EVENT_LOOP_MAIN)) == C_ERR) {
      serverLog(LL_WARNING, "Failed listening on port %u (TLS), aborting.",
                g_pserver->port);
      exit(1);
    }
  } else {
    // We use the main threads file descriptors
    memcpy(&g_pserver->rgthreadvar[iel].ipfd,
           &g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].ipfd,
           sizeof(socketFds));
    g_pserver->rgthreadvar[iel].ipfd.count =
        g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].ipfd.count;
  }

  /* Create an event handler for accepting new connections in TCP */
  // 每个线程创建连接处理函数
  for (int j = 0; j < g_pserver->rgthreadvar[iel].ipfd.count; j++) {
    if (aeCreateFileEvent(g_pserver->rgthreadvar[iel].el,
                          g_pserver->rgthreadvar[iel].ipfd.fd[j],
                          AE_READABLE | AE_READ_THREADSAFE, acceptTcpHandler,
                          NULL) == AE_ERR) {
      serverPanic("Unrecoverable error creating g_pserver->ipfd file event.");
    }
  }
  ...
}

// 用户连接处理函数
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
    char cip[NET_IP_STR_LEN];
    UNUSED(mask);
    UNUSED(privdata);
    UNUSED(el);

    while(max--) {
        cfd = anetTcpAccept(serverTL->neterr, fd, cip, sizeof(cip), &cport);
        if (cfd == ANET_ERR) {
            if (errno != EWOULDBLOCK)
                serverLog(LL_WARNING,
                    "Accepting client connection: %s", serverTL->neterr);
            return;
        }
        anetCloexec(cfd);
        serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);

        acceptOnThread(connCreateAcceptedSocket(cfd), 0, cip);
    }
}

最后通过acceptOnThread将accept到的用户连接指定给某个worker线程,这里的考虑主要是尽可能的均匀给每个worker线程分配用户连接,同时考虑下特殊的用户连接,比如:主客户端。

int chooseBestThreadForAccept()
{
    int ielMinLoad = 0;
    int cclientsMin = INT_MAX;
    for (int iel = 0; iel < cserver.cthreads; ++iel)
    {
        // 选择accept个数最小的连接
        int cclientsThread;
        atomicGet(g_pserver->rgthreadvar[iel].cclients, cclientsThread);
        cclientsThread += rgacceptsInFlight[iel].load(std::memory_order_relaxed);
        // Note: Its repl factor less one because cclients also includes replicas, so we don't want to double count
        // 考虑主从复制的扇出
        cclientsThread += (g_pserver->rgthreadvar[iel].cclientsReplica) * (g_pserver->replicaIsolationFactor-1);
        if (cclientsThread < cserver.thread_min_client_threshold)
            return iel;
        if (cclientsThread < cclientsMin)
        {
            cclientsMin = cclientsThread;
            ielMinLoad = iel;
        }
    }
    return ielMinLoad;
}

最后与Redis类似,调用acceptCommonHandler创建Client,并在在client中设置读取用户请求读取函数:

static void acceptCommonHandler(connection *conn, int flags, char *ip, int iel) {
    client *c;
    char conninfo[100];
    UNUSED(ip);
    AeLocker locker;
    locker.arm(nullptr);
    ...
    /* Limit the number of connections we take at the same time.
     *
     * Admission control will happen before a client is created and connAccept()
     * called, because we don't want to even start transport-level negotiation
     * if rejected. */
    if (listLength(g_pserver->clients) + getClusterConnectionsCount()
        >= g_pserver->maxclients)
    {
        const char *err;
        if (g_pserver->cluster_enabled)
            err = "-ERR max number of clients + cluster "
                  "connections reached\r\n";
        else
            err = "-ERR max number of clients reached\r\n";

        /* That's a best effort error message, don't check write errors.
         * Note that for TLS connections, no handshake was done yet so nothing
         * is written and the connection will just drop. */
        if (connWrite(conn,err,strlen(err)) == -1) {
            /* Nothing to do, Just to avoid the warning... */
        }
        g_pserver->stat_rejected_conn++;
        connClose(conn);
        return;
    }

    /* Create connection and client */
    // 创建client
    if ((c = createClient(conn, iel)) == NULL) {
        serverLog(LL_WARNING,
            "Error registering fd event for the new client: %s (conn: %s)",
            connGetLastError(conn),
            connGetInfo(conn, conninfo, sizeof(conninfo)));
        connClose(conn); /* May be already closed, just ignore errors */
        return;
    }

    /* Last chance to keep flags */
    c->flags |= flags;
	...
}

client *createClient(connection *conn, int iel) {
    client *c = new client;
    serverAssert(conn == nullptr || (iel == (serverTL - g_pserver->rgthreadvar)));

    c->iel = iel;
    /* passing NULL as conn it is possible to create a non connected client.
     * This is useful since all the commands needs to be executed
     * in the context of a client. When commands are executed in other
     * contexts (for instance a Lua script) we need a non connected client. */
    // 
    if (conn) {
        serverAssert(iel == (serverTL - g_pserver->rgthreadvar));
        connNonBlock(conn);
        connEnableTcpNoDelay(conn);
        if (cserver.tcpkeepalive)
            connKeepAlive(conn,cserver.tcpkeepalive);
        // 设置读取函数
        connSetReadHandler(conn, readQueryFromClient, true);
        connSetPrivateData(conn, c);
    }

    selectDb(c,0);
    uint64_t client_id;
    ...
}

可以看出,这个处理函数readQueryFromClient与Redis中一样,剩下这个客户端所有的请求都由这个worker线程去处理,接下来就是读取用户网络数据,解析请求,处理请求,最后给客户端发回响应,最后看看发送响应给客户端的过程,通过addReply函数给用户发送响应,此时,可能是client绑定的worker线程调用,也有可能是非client绑定的线程调用,这样发送响应时分成两种:

void addReply(client *c, robj_roptr obj) {
    if (prepareClientToWrite(c) != C_OK) return;
    ...
}

int prepareClientToWrite(client *c) {
	bool fAsync = !FCorrectThread(c);
	...
	if (!fAsync && (c->flags & CLIENT_SLAVE || !clientHasPendingReplies(c))) clientInstallWriteHandler(c);
    if (fAsync && !(c->fPendingAsyncWrite)) clientInstallAsyncWriteHandler(c);
    ...
}

clientHasPendingReplies的响应在每个worker线程的beforeSleep中通过handleClientsWithPendingWrites处理,clientInstallAsyncWriteHandler会在全局的异步发送列追加该client,然后由对应的worker线程再把要发送的client转发给与之绑定的worker线程。
至此,KeyDB的多线程网络模型,已经对应的用户请求处理流程分析完毕。

 类似资料: