使用案例
/*
DEFINE_int32(num_netio_threads, 0,
"Number of networking threads, 0 for number of physical CPU cores");
DEFINE_int32(num_accept_threads, 1, "Number of threads to accept incoming connections");
DEFINE_int32(num_worker_threads, 0, "Number of threads to execute user queries");
DEFINE_bool(reuse_port, true, "Whether to turn on the SO_REUSEPORT option");
DEFINE_int32(listen_backlog, 1024, "Backlog of the listen socket");
*/
auto threadFactory = std::make_shared<:namedthreadfactory>("graph-netio");
auto ioThreadPool = std::make_shared<:iothreadpoolexecutor>(
FLAGS_num_netio_threads, std::move(threadFactory));
gServer = std::make_unique<:thrift::thriftserver>();
gServer->setIOThreadPool(ioThreadPool);
auto interface = std::make_shared();
status = interface->init(ioThreadPool);
if (!status.ok()) {
LOG(ERROR) << status;
return EXIT_FAILURE;
}
gServer->setInterface(std::move(interface));
gServer->setAddress(localIP, FLAGS_port);
// fbthrift-2018.08.20 always enables SO_REUSEPORT once `setReusePort' is called
// which had been fixed in later version.
if (FLAGS_reuse_port) {
gServer->setReusePort(FLAGS_reuse_port);
}
gServer->setIdleTimeout(std::chrono::seconds(FLAGS_client_idle_timeout_secs));
gServer->setNumCPUWorkerThreads(FLAGS_num_worker_threads);
gServer->setCPUWorkerThreadName("executor");
gServer->setNumAcceptThreads(FLAGS_num_accept_threads);
gServer->setListenBacklog(FLAGS_listen_backlog);
gServer->setThreadStackSizeMB(5);
FLOG_INFO("Starting nebula-graphd on %s:%d\n", localIP.c_str(), FLAGS_port);
try {
gServer->serve(); // Blocking wait until shut down via gServer->stop()
} catch (const std::exception &e) {
FLOG_ERROR("Exception thrown while starting the RPC server: %s", e.what());
return EXIT_FAILURE;
}
ThriftServer.h作用: ThriftServer服务端
配置服务端的处理器: setInterface, 每次新的请求都使用同一个实例来处理???。参考:AsyncProcessorFactory 的getProcessor返回一个新的实例
可选:
auto handler = std::make_shared(kvStore);
auto proc_factory = std::make_shared>(
handler);
server->setProcessorFactory(proc_factory);
ip,端口 : setAddress
配置IO线程池, 线程池用来处理IO任务,比如网络连接,IO线程数默认等于线程池中分配的线程数: setIOThreadPool
配置cpu线程数,和线程名称:用来处理非IO任务,比如计算任务:setNumCPUWorkerThreads、setCPUWorkerThreadName。 setNumCPUWorkerThreads和setThreadManager是互斥,只能选择设置一个。因为setNumCPUWorkerThreads设置后,内部会创建ThreadManager。
serve方法中:根据setNumCPUWorkerThreads的结果(getNumCPUWorkerThreads)
创建ThreadManager然后调用setThreadManager。
void ThriftServer::setupThreadManager() {
if (!threadManager_) {
std::shared_ptr<:thrift::concurrency::threadmanager> threadManager(
PriorityThreadManager::newPriorityThreadManager(
getNumCPUWorkerThreads(), true /*stats*/));
threadManager->enableCodel(getEnableCodel());
// If a thread factory has been specified, use it.
if (threadFactory_) {
threadManager->threadFactory(threadFactory_);
}
auto poolThreadName = getCPUWorkerThreadName();
if (!poolThreadName.empty()) {
threadManager->setNamePrefix(poolThreadName);
}
threadManager->start();
setThreadManager(threadManager);
}
}
配置客户端请求连接成功最大等待时间,如果超时,则断开,默认一直等待:setIdleTimeout
配置最大积压的待连接数,如果积压的连接这个值,则超出部分讲将会被抛弃:setListenBacklog
# The number of threads to execute user queries, 0 for # of CPU cores
gServer->setNumCPUWorkerThreads(FLAGS_num_worker_threads);
gServer->setCPUWorkerThreadName("executor");
# The number of threads to accept incoming connections
gServer->setNumAcceptThreads(FLAGS_num_accept_threads);