当前位置: 首页 > 工具软件 > fbthrift > 使用案例 >

server如何调用 thrift_fbthrift--ThriftServer解析

韩欣怿
2023-12-01

使用案例

/*

DEFINE_int32(num_netio_threads, 0,

"Number of networking threads, 0 for number of physical CPU cores");

DEFINE_int32(num_accept_threads, 1, "Number of threads to accept incoming connections");

DEFINE_int32(num_worker_threads, 0, "Number of threads to execute user queries");

DEFINE_bool(reuse_port, true, "Whether to turn on the SO_REUSEPORT option");

DEFINE_int32(listen_backlog, 1024, "Backlog of the listen socket");

*/

auto threadFactory = std::make_shared<:namedthreadfactory>("graph-netio");

auto ioThreadPool = std::make_shared<:iothreadpoolexecutor>(

FLAGS_num_netio_threads, std::move(threadFactory));

gServer = std::make_unique<:thrift::thriftserver>();

gServer->setIOThreadPool(ioThreadPool);

auto interface = std::make_shared();

status = interface->init(ioThreadPool);

if (!status.ok()) {

LOG(ERROR) << status;

return EXIT_FAILURE;

}

gServer->setInterface(std::move(interface));

gServer->setAddress(localIP, FLAGS_port);

// fbthrift-2018.08.20 always enables SO_REUSEPORT once `setReusePort' is called

// which had been fixed in later version.

if (FLAGS_reuse_port) {

gServer->setReusePort(FLAGS_reuse_port);

}

gServer->setIdleTimeout(std::chrono::seconds(FLAGS_client_idle_timeout_secs));

gServer->setNumCPUWorkerThreads(FLAGS_num_worker_threads);

gServer->setCPUWorkerThreadName("executor");

gServer->setNumAcceptThreads(FLAGS_num_accept_threads);

gServer->setListenBacklog(FLAGS_listen_backlog);

gServer->setThreadStackSizeMB(5);

FLOG_INFO("Starting nebula-graphd on %s:%d\n", localIP.c_str(), FLAGS_port);

try {

gServer->serve(); // Blocking wait until shut down via gServer->stop()

} catch (const std::exception &e) {

FLOG_ERROR("Exception thrown while starting the RPC server: %s", e.what());

return EXIT_FAILURE;

}

ThriftServer.h作用: ThriftServer服务端

配置服务端的处理器: setInterface, 每次新的请求都使用同一个实例来处理???。参考:AsyncProcessorFactory 的getProcessor返回一个新的实例

可选:

auto handler = std::make_shared(kvStore);

auto proc_factory = std::make_shared>(

handler);

server->setProcessorFactory(proc_factory);

ip,端口 : setAddress

配置IO线程池, 线程池用来处理IO任务,比如网络连接,IO线程数默认等于线程池中分配的线程数: setIOThreadPool

配置cpu线程数,和线程名称:用来处理非IO任务,比如计算任务:setNumCPUWorkerThreads、setCPUWorkerThreadName。 setNumCPUWorkerThreads和setThreadManager是互斥,只能选择设置一个。因为setNumCPUWorkerThreads设置后,内部会创建ThreadManager。

serve方法中:根据setNumCPUWorkerThreads的结果(getNumCPUWorkerThreads)

创建ThreadManager然后调用setThreadManager。

void ThriftServer::setupThreadManager() {

if (!threadManager_) {

std::shared_ptr<:thrift::concurrency::threadmanager> threadManager(

PriorityThreadManager::newPriorityThreadManager(

getNumCPUWorkerThreads(), true /*stats*/));

threadManager->enableCodel(getEnableCodel());

// If a thread factory has been specified, use it.

if (threadFactory_) {

threadManager->threadFactory(threadFactory_);

}

auto poolThreadName = getCPUWorkerThreadName();

if (!poolThreadName.empty()) {

threadManager->setNamePrefix(poolThreadName);

}

threadManager->start();

setThreadManager(threadManager);

}

}

配置客户端请求连接成功最大等待时间,如果超时,则断开,默认一直等待:setIdleTimeout

配置最大积压的待连接数,如果积压的连接这个值,则超出部分讲将会被抛弃:setListenBacklog

# The number of threads to execute user queries, 0 for # of CPU cores

gServer->setNumCPUWorkerThreads(FLAGS_num_worker_threads);

gServer->setCPUWorkerThreadName("executor");

# The number of threads to accept incoming connections

gServer->setNumAcceptThreads(FLAGS_num_accept_threads);

 类似资料: