讲真的,开源出来的tars代码质量真的很一般。虽然原生的tars协议(jce协议),支持的比较好, 但是其他协议支持就很一般了,比如pb协议的支持,里面有几个bug, 包括命名空间的支持、rpc调用多个函数指针错乱,以及对协程不支持。anyway,内部版本的taf应该没有这么些问题,可能是负责开源的同学把它阉割过度了。只能说,tars开源的项目真的一点都不真诚。
再来讲讲,tars的异步编程,对这种future/promise的编程范式,代码写起来不太舒服。代码碎片,难以维护,难以理解。接触过那么多的rpc框架,tars异步编还是不太友好,虽然它是一个解决rpc调用做并发的一个解决范式, 但是却给开发同学带来其他额外的理解成本。
相比之下,协程化,真的是一个不错的解决方案。协程化,可以实现异步调用代码同步写,所有代码流程都可以在一个函数里面完成。另外协程化,上下文切换损耗比较小,比异步编程的线程上下文切换更加友好,特别适用于IO型调度的业务使用。协程化,还可以减少代码量,减少代码的异常分支,更容易理解。
当然协程化,也会带来一些例如栈空间的问题,因为栈空间有限,会带来未知原因coredump的问题。
然后,看了一下tars开源的原生的tars(jce)协议生成工具是支持协程的,但是pb协议生成工具竟然不支持协程,只能照着原生的工具,照着封装一个吧。主要修改目录如下:TarsCpp/tools/pb2tarscpp。
主要是参考tools/tars2cpp的代码,移植到tools/pb2tarscpp当中。具体如下:
diff --git a/tools/pb2tarscpp/CppGenCallback.cpp b/tools/pb2tarscpp/CppGenCallback.cpp
index 63800cb..a5cb00f 100644
--- a/tools/pb2tarscpp/CppGenCallback.cpp
+++ b/tools/pb2tarscpp/CppGenCallback.cpp
@@ -157,3 +157,126 @@ std::string GenPrxCallback(const ::google::protobuf::ServiceDescriptor* desc, in
return out;
}
+std::string GenCoroPrxCallback(const ::google::protobuf::ServiceDescriptor* desc, int indent) {
+ std::string out;
+ out.reserve(8 * 1024);
+
+ const auto& name = desc->name();
+ const auto& pkg = desc->file()->package();
+ out += "/* callback of coroutine async proxy for client */";
+ out += LineFeed(indent);
+ out += "class " + name + "CoroPrxCallback : public "+ name + "PrxCallback" + LineFeed(indent);
+ out += "{";
+ out += LineFeed(indent);
+ out += "public:" + LineFeed(++indent) + "virtual ~" + name + "CoroPrxCallback() {}";
+ out += LineFeed(indent);
+ out += LineFeed(indent);
+
+ //sort by method name
+ std::map<std::string, const ::google::protobuf::MethodDescriptor*> m_method;
+ for (int i = 0; i < desc->method_count(); ++i)
+ {
+ m_method[desc->method(i)->name()] = desc->method(i);
+ }
+
+ for(auto it = m_method.begin(); it != m_method.end(); ++it)
+ {
+ auto method = it->second;
+ out += GenCallbackMethod(method, pkg, indent);
+ }
+
+ out += LineFeed(indent);
+ out += LineFeed(indent);
+
+ // gen onDispatch
+ out += LineFeed(indent);
+ out += LineFeed(indent);
+ out += "virtual int onDispatch(tars::ReqMessagePtr msg)";
+ out += LineFeed(indent);
+ out += "{";
+ out += LineFeed(++indent);
+ out += "static ::std::string __all[] = ";
+ out += "{";
+ out += LineFeed(++indent);
+ for(auto it = m_method.begin(); it != m_method.end(); ++it)
+ {
+ auto method = it->second;
+ out += "\"" + method->name() + "\",";
+ out += LineFeed(indent);
+ }
+ out += LineFeed(--indent);
+ out += "};" + LineFeed(indent);
+ out += "pair<string*, string*> r = equal_range(__all, __all + " + std::to_string((long long)desc->method_count()) + ", " + "std::string(msg->request.sFuncName));";
+ out += LineFeed(indent);
+ out += "if(r.first == r.second) return tars::TARSSERVERNOFUNCERR;" + LineFeed(indent);
+ out += "switch(r.first - __all)" + LineFeed(indent);
+ out += "{";
+ out += LineFeed(++indent);
+ int i = 0;
+ for(auto it = m_method.begin(); it != m_method.end(); ++it)
+ {
+ auto method = it->second;
+ out += LineFeed(indent);
+ out += "case " + std::to_string((long long)i) + ":" + LineFeed(indent);
+ out += "{" + LineFeed(++indent);
+ out += "if (msg->response.iRet != tars::TARSSERVERSUCCESS)" + LineFeed(indent);
+ out += "{" + LineFeed(++indent);
+ out += "callback_" + method->name() + "_exception(msg->response.iRet);" + LineFeed(indent);
+ out += "return msg->response.iRet;" + LineFeed(--indent) + "}";
+
+ out += LineFeed(indent);
+ out += ToCppNamespace(method->output_type()->full_name()) + " _ret;" + LineFeed(indent);
+ out += "_ret.ParseFromArray(msg->response.sBuffer.data(), msg->response.sBuffer.size());" + LineFeed(indent);
+
+ out += "try" + LineFeed(indent);
+ out += "{" + LineFeed(indent);
+ out += LineFeed(++indent);
+ out += "setResponseContext(msg->response.context);" + LineFeed(indent);
+ out += "callback_" + method->name() + "(_ret);" + LineFeed(indent);
+ out += LineFeed(--indent);
+ out += "}" + LineFeed(indent);
+ out += "catch(std::exception &ex)" + LineFeed(indent);
+ out += "{" + LineFeed(indent);
+ out += LineFeed(++indent);
+ out += "callback_" + method->name() + "_exception(tars::TARSCLIENTDECODEERR);" + LineFeed(indent);
+ out += "return tars::TARSCLIENTDECODEERR;" + LineFeed(indent);
+ out += LineFeed(--indent);
+ out += "}" + LineFeed(indent);
+ out += "catch(...)" + LineFeed(indent);
+ out += "{" + LineFeed(indent);
+ out += LineFeed(++indent);
+ out += "callback_" + method->name() + "_exception(tars::TARSCLIENTDECODEERR);" + LineFeed(indent);
+ out += "return tars::TARSCLIENTDECODEERR;" + LineFeed(indent);
+ out += LineFeed(--indent);
+ out += "}" + LineFeed(indent);
+
+ out += "return tars::TARSSERVERSUCCESS;";
+
+ out += LineFeed(--indent);
+ out += "}";
+
+ ++i;
+ }
+
+ // end switch
+ out += LineFeed(--indent);
+ out += "}";
+
+ out += LineFeed(indent);
+ out += LineFeed(indent);
+ out += "return tars::TARSSERVERNOFUNCERR;";
+ out += LineFeed(--indent);
+ out += "}"; // end of onDispatch
+
+ out += "protected:" + LineFeed(indent);
+ out += LineFeed(++indent);
+ out += "map<std::string, std::string> _mRspContext;" + LineFeed(indent);
+ out += LineFeed(--indent);
+ out += "};"; // end of class CallbackPrx
+
+ out += LineFeed(indent);
+ out += "typedef tars::TC_AutoPtr<" + name + "PrxCallback> " + name + "PrxCallbackPtr;";
+ out += LineFeed(indent);
+
+ return out;
+}
diff --git a/tools/pb2tarscpp/CppGenCallback.h b/tools/pb2tarscpp/CppGenCallback.h
index 9bc843f..dcb121f 100644
--- a/tools/pb2tarscpp/CppGenCallback.h
+++ b/tools/pb2tarscpp/CppGenCallback.h
@@ -21,4 +21,6 @@ class ServiceDescriptor;
// gen prx callback
std::string GenPrxCallback(const ::google::protobuf::ServiceDescriptor* desc, int indent);
+// gen coro prx callback
+std::string GenCoroPrxCallback(const ::google::protobuf::ServiceDescriptor* desc, int indent);
diff --git a/tools/pb2tarscpp/CppGenProxy.cpp b/tools/pb2tarscpp/CppGenProxy.cpp
index 2298ad0..d370cb2 100644
--- a/tools/pb2tarscpp/CppGenProxy.cpp
+++ b/tools/pb2tarscpp/CppGenProxy.cpp
@@ -63,6 +63,27 @@ static std::string GenAsyncCall(const ::google::protobuf::MethodDescriptor* meth
return out;
}
+static std::string GenCoroCall(const ::google::protobuf::MethodDescriptor* method,
+ const std::string& name,
+ const std::string& pkg,
+ int indent) {
+ std::string out;
+ out.reserve(8 * 1024);
+
+ out += "void coro_" + method->name() + "(" + name + "PrxCallbackPtr callback, const " +
+ ToCppNamespace(method->input_type()->full_name()) + "& req, const std::map<std::string, std::string>& context = TARS_CONTEXT())" + LineFeed(indent);
+ out += "{" + LineFeed(++indent);
+ out += "std::string _os;" + LineFeed(indent) +
+ "req.SerializeToString(&_os);" + LineFeed(indent) +
+ "std::vector<char> _vc(_os.begin(), _os.end());" + LineFeed(indent);
+ out += "std::map<std::string, std::string> _mStatus;" + LineFeed(indent);
+ out += "tars_invoke_async(tars::TARSNORMAL, \"" + method->name() + "\", _vc, context, _mStatus, callback, true);";
+ out += LineFeed(--indent) + "}";
+ out += LineFeed(indent);
+
+ return out;
+}
+
std::string GenPrx(const ::google::protobuf::ServiceDescriptor* desc, int indent) {
std::string out;
out.reserve(8 * 1024);
@@ -98,6 +119,8 @@ std::string GenPrx(const ::google::protobuf::ServiceDescriptor* desc, int indent
out += GenSyncCall(method, pkg, indent);
// async method call
out += GenAsyncCall(method, name, pkg, indent);
+ // coro method call
+ out += GenCoroCall(method, name, pkg, indent);
}
diff --git a/tools/pb2tarscpp/CppPlugin.cpp b/tools/pb2tarscpp/CppPlugin.cpp
index 69a5aec..312d8cd 100644
--- a/tools/pb2tarscpp/CppPlugin.cpp
+++ b/tools/pb2tarscpp/CppPlugin.cpp
@@ -33,6 +33,7 @@ bool CppTarsGenerator::Generate(const google::protobuf::FileDescriptor *file,
content += LineFeed(indent);
for (int i = 0; i < file->service_count(); ++i) {
content += GenPrxCallback(file->service(i), indent);
+ content += GenCoroPrxCallback(file->service(i), indent);
content += GenPrx(file->service(i), indent);
content += GenServant(file->service(i), indent);
}
tars要开启协程的化,还需要在模块配置文件里面打开协程的支持,不然默认不支持协程的调用, 还有协程内存大小,栈大小。
ServerConfig::OpenCoroutine = TC_Common::strto<bool>(toDefault(_conf.get("/tars/application/server<opencoroutine>"), "1"));
ServerConfig::CoroutineMemSize = TC_Common::toSize(toDefault(_conf.get("/tars/application/server<coroutinememsize>"), "1073741824"), 1073741824);
ServerConfig::CoroutineStackSize = TC_Common::toSize(toDefault(_conf.get("/tars/application/server<coroutinestack>"), "131072"), 131072);
照着远程的例子写了一个调用,是可以成功的。具体参考:
https://github.com/TarsCloud/TarsCpp/blob/master/examples/CoroutineDemo/testParallelCoro/main.cpp
CoroParallelBasePtr sharedPtr = new CoroParallelBase(2);
BServantCoroCallbackPtr cb1 = new BServantCoroCallback();
cb1->setCoroParallelBasePtr(sharedPtr);
_prx->coro_testCoroSerial(cb1, sIn);
BServantCoroCallbackPtr cb2 = new BServantCoroCallback();
cb2->setCoroParallelBasePtr(sharedPtr);
_prx->coro_testCoroParallel(cb2, sIn);
coroWhenAll(sharedPtr);
cout << "ret1:" << cb1->_sOut << "|ret2:" << cb2->_sOut << endl;
;
if(cb1->_iRet == 0 && cb2->_iRet == 0 && cb1->_iException == 0 && cb2->_iException == 0)
{
++sum;
}