系统组件 - RPC

优质
小牛编辑
141浏览
2023-12-01

EasySwoole 提供开放式的RPC服务。RPC服务分为三级模式:服务=>服务组=>行为。每个服务可以单独现在Openssl加密。
支持超时、成功、失败回调(即最基础的熔断保护和服务降级)

示例代码

服务端

服务A

  1. namespace App\RpcController\A;
  2. use EasySwoole\Core\Component\Rpc\AbstractInterface\AbstractRpcService;
  3. class G extends AbstractRpcService
  4. {
  5. function index()
  6. {
  7. // TODO: Implement index() method.
  8. $this->response()->setArgs([12,3]);
  9. }
  10. }

服务A中存在G服务组,G服务组中实现了Index行为。

服务B

  1. namespace App\RpcController\B;
  2. use EasySwoole\Core\Component\Rpc\AbstractInterface\AbstractRpcService;
  3. class Index extends AbstractRpcService
  4. {
  5. function index()
  6. {
  7. // TODO: Implement index() method.
  8. var_dump('hit');
  9. $this->response()->setResult('this is b index');
  10. }
  11. }

服务绑定

  1. use EasySwoole\Core\Component\Rpc\Server;
  2. public static function mainServerCreate(ServerManager $server,EventRegister $register): void
  3. {
  4. // TODO: Implement mainServerCreate() method.
  5. Server::getInstance()->addService('A',9502)
  6. ->addService('B',9503,'password123')
  7. ->attach();
  8. }

客户端

客户端测试代码

  1. require_once 'vendor/autoload.php';
  2. \EasySwoole\Core\Core::getInstance()->initialize();
  3. //注册服务,让RPC服务管理中心知道当前系统中存在哪些服务
  4. $ServiceManager = \EasySwoole\Core\Component\Rpc\Server::getInstance();
  5. $ServiceManager->updateServiceNode(new \EasySwoole\Core\Component\Rpc\Common\ServiceNode(
  6. [
  7. 'serviceName'=>'A',
  8. 'port'=>9502
  9. ]
  10. ));
  11. $ServiceManager->updateServiceNode(new \EasySwoole\Core\Component\Rpc\Common\ServiceNode(
  12. [
  13. 'serviceName'=>'B',
  14. 'port'=>9503,
  15. 'encryptToken'=>'password123'
  16. ]
  17. ));
  18. //创建RPC客户端
  19. $client = new \EasySwoole\Core\Component\Rpc\Client();
  20. //调用A服务中G服务组的index行为
  21. $client->addCall('A','g','index')->setFailCall(function(\EasySwoole\Core\Component\Rpc\Client\ServiceResponse $response){
  22. var_dump('11fail',$response);
  23. })->setSuccessCall(function (\EasySwoole\Core\Component\Rpc\Client\ServiceResponse $response){
  24. var_dump('11success',$response);
  25. });
  26. //调用A服务中G服务组的c行为
  27. $client->addCall('A','g','c')->setFailCall(function(\EasySwoole\Core\Component\Rpc\Client\ServiceResponse $response){
  28. var_dump('22fail',$response);
  29. })->setSuccessCall(function (\EasySwoole\Core\Component\Rpc\Client\ServiceResponse $response){
  30. var_dump('22success',$response);
  31. });
  32. //调用A服务中c服务组的c行为
  33. $client->addCall('A','c','c')->setFailCall(function(\EasySwoole\Core\Component\Rpc\Client\ServiceResponse $response){
  34. var_dump('33fail',$response);
  35. })->setSuccessCall(function (\EasySwoole\Core\Component\Rpc\Client\ServiceResponse $response){
  36. var_dump('33success',$response);
  37. });
  38. //调用c服务中c服务组的c行为
  39. $client->addCall('c','c','c')->setFailCall(function(\EasySwoole\Core\Component\Rpc\Client\ServiceResponse $response){
  40. var_dump('44fail',$response);
  41. })->setSuccessCall(function (\EasySwoole\Core\Component\Rpc\Client\ServiceResponse $response){
  42. var_dump('44success',$response);
  43. });
  44. //调用B服务中c服务组的index行为
  45. $client->addCall('B','c','index')->setFailCall(function(\EasySwoole\Core\Component\Rpc\Client\ServiceResponse $response){
  46. var_dump('55fail',$response);
  47. })->setSuccessCall(function (\EasySwoole\Core\Component\Rpc\Client\ServiceResponse $response){
  48. var_dump('55success',$response);
  49. });
  50. //执行调用
  51. $client->call();

在没有集群模式时,可以在EasySwooleEvent的主服务启动事件中,注册好存在的服务信息,以后客户端可以直接调用服务,不需要继续再做服务发现注册。
再集群模式时,有服务自动发现。

原理讲解

服务端

服务端实现关键代码

  1. namespace EasySwoole\Core\Component\Rpc;
  2. use EasySwoole\Core\AbstractInterface\Singleton;
  3. use EasySwoole\Core\Component\Openssl;
  4. use EasySwoole\Core\Component\Rpc\Common\Parser;
  5. use EasySwoole\Core\Component\Rpc\Common\ServiceResponse;
  6. use EasySwoole\Core\Component\Rpc\Common\Status;
  7. use EasySwoole\Core\Component\Rpc\Server\ServiceManager;
  8. use EasySwoole\Core\Component\Rpc\Server\ServiceNode;
  9. use EasySwoole\Core\Component\Trigger;
  10. use EasySwoole\Core\Socket\Client\Tcp;
  11. use EasySwoole\Core\Socket\Response;
  12. use EasySwoole\Core\Swoole\EventHelper;
  13. use EasySwoole\Core\Swoole\ServerManager;
  14. class Server
  15. {
  16. use Singleton;
  17. private $list = [];
  18. private $controllerNameSpace = 'App\\RpcController\\';
  19. private $protocolSetting = [
  20. 'open_length_check' => true,
  21. 'package_length_type' => 'N',
  22. 'package_length_offset' => 0,
  23. 'package_body_offset' => 4,
  24. 'package_max_length' => 1024*64,
  25. 'heartbeat_idle_time' => 5,
  26. 'heartbeat_check_interval' => 30,
  27. ];
  28. //可以自定义分包协议,这部分功能的parser 暂未分离,提前预留
  29. function setProtocolSetting(array $data)
  30. {
  31. $this->protocolSetting = $data;
  32. return $this;
  33. }
  34. //自定义RPC控制器名称空间
  35. function setControllerNameSpace(string $nameSpace):Server
  36. {
  37. $this->controllerNameSpace = $nameSpace;
  38. return $this;
  39. }
  40. //添加一个服务
  41. function addService(string $serviceName,int $port,$encryptToken = null,string $address = '0.0.0.0')
  42. {
  43. //一个EasySwoole服务上不允许同名服务
  44. $this->list[$serviceName] = [
  45. 'serviceName'=>$serviceName,
  46. 'port'=>$port,
  47. 'encryptToken'=>$encryptToken,
  48. 'address'=>$address
  49. ];
  50. return $this;
  51. }
  52. //绑定到主服务
  53. public function attach()
  54. {
  55. foreach ($this->list as $name => $item){
  56. $node = new ServiceNode();
  57. $node->setPort($item['port']);
  58. $node->setServiceName($name);
  59. $node->setEncryptToken($item['encryptToken']);
  60. ServiceManager::getInstance()->addServiceNode($node);
  61. $sub = ServerManager::getInstance()->addServer("RPC_SERVER_{$name}",$item['port'],SWOOLE_TCP,$item['address'],$this->protocolSetting);
  62. $nameSpace = $this->controllerNameSpace.ucfirst($item['serviceName']);
  63. EventHelper::register($sub,$sub::onReceive,function (\swoole_server $server, int $fd, int $reactor_id, string $data)use($item,$nameSpace){
  64. $response = new ServiceResponse();
  65. $client = new Tcp($fd,$reactor_id);
  66. //解包,获得原始完整字符串
  67. $data = Parser::unPack($data);
  68. $openssl = null;
  69. //若有加密配置,则对数据包解密
  70. if(!empty($item['encryptToken'])){
  71. $openssl = new Openssl($item['encryptToken']);
  72. }
  73. if($openssl){
  74. $data = $openssl->decrypt($data);
  75. }
  76. if($data !== false){
  77. //看看能否成功解析出命令
  78. $caller = Parser::decode($data,$client);
  79. if($caller){
  80. $response->arrayToBean($caller->toArray());
  81. $response->setArgs(null);
  82. $group = ucfirst($caller->getServiceGroup());
  83. //搜索有没有完整的服务=>服务组控制器
  84. $controller = "{$nameSpace}\\{$group}";
  85. if(!class_exists($controller)){
  86. $response->setStatus(Status::SERVICE_GROUP_NOT_FOUND);
  87. //若没有,则搜索有没有完整的服务Index控制器(默认Index服务组)
  88. $controller = "{$nameSpace}\\Index";
  89. if(!class_exists($controller)){
  90. $controller = null;
  91. }else{
  92. $response->setStatus(Status::OK);
  93. }
  94. }
  95. if($controller){
  96. try{
  97. (new $controller($client,$caller,$response));
  98. }catch (\Throwable $throwable){
  99. Trigger::throwable($throwable);
  100. $response->setStatus(Status::SERVICE_ERROR);
  101. }
  102. }else{
  103. $response->setStatus(Status::SERVICE_NOT_FOUND);
  104. }
  105. }else{
  106. $response->setStatus(Status::PACKAGE_DECODE_ERROR);
  107. }
  108. }else{
  109. $response->setStatus(Status::PACKAGE_ENCRYPT_DECODED_ERROR);
  110. }
  111. //进行json打包,并放回给客户端
  112. $response = json_encode($response->toArray(),JSON_UNESCAPED_SLASHES|JSON_UNESCAPED_UNICODE);
  113. if($openssl){
  114. $response = $openssl->encrypt($response);
  115. }
  116. Response::response($client,Parser::pack($response));
  117. });
  118. }
  119. }
  120. }

客户端

客户端基于swoole client + socket select实现的伪异步客户端。

跨平台调用

json请求结构体

  1. namespace EasySwoole\Core\Component\Rpc\Common;
  2. use EasySwoole\Core\Component\Spl\SplBean;
  3. class ServiceCaller extends SplBean
  4. {
  5. protected $serviceName;
  6. protected $serviceGroup;
  7. protected $serviceAction;
  8. protected $args = null;
  9. /**
  10. * @return mixed
  11. */
  12. public function getServiceName()
  13. {
  14. return $this->serviceName;
  15. }
  16. /**
  17. * @param mixed $serviceName
  18. */
  19. public function setServiceName($serviceName): void
  20. {
  21. $this->serviceName = $serviceName;
  22. }
  23. /**
  24. * @return mixed
  25. */
  26. public function getServiceGroup()
  27. {
  28. return $this->serviceGroup;
  29. }
  30. /**
  31. * @param mixed $serviceGroup
  32. */
  33. public function setServiceGroup($serviceGroup): void
  34. {
  35. $this->serviceGroup = $serviceGroup;
  36. }
  37. /**
  38. * @return mixed
  39. */
  40. public function getServiceAction()
  41. {
  42. return $this->serviceAction;
  43. }
  44. /**
  45. * @param mixed $serviceAction
  46. */
  47. public function setServiceAction($serviceAction): void
  48. {
  49. $this->serviceAction = $serviceAction;
  50. $this->initialize();
  51. }
  52. /**
  53. * @return null
  54. */
  55. public function getArgs()
  56. {
  57. return $this->args;
  58. }
  59. /**
  60. * @param null $args
  61. */
  62. public function setArgs($args): void
  63. {
  64. $this->args = $args;
  65. }
  66. protected function initialize(): void
  67. {
  68. if(empty($this->serviceAction)){
  69. $this->serviceAction = 'index';
  70. }
  71. }
  72. }

json相应结构体

  1. namespace EasySwoole\Core\Component\Rpc\Client;
  2. use EasySwoole\Core\Component\Rpc\Server\ServiceNode;
  3. class ServiceResponse extends \EasySwoole\Core\Component\Rpc\Common\ServiceResponse
  4. {
  5. protected $responseNode = null;
  6. /**
  7. * @return null
  8. */
  9. public function getResponseNode():?ServiceNode
  10. {
  11. return $this->responseNode;
  12. }
  13. /**
  14. * @param null $responseNode
  15. */
  16. public function setResponseNode($responseNode): void
  17. {
  18. $this->responseNode = $responseNode;
  19. }
  20. }

默认状态码规则

  1. namespace EasySwoole\Core\Component\Rpc\Common;
  2. class Status
  3. {
  4. const OK = 1;//rpc调用成功
  5. const SERVICE_REJECT_REQUEST = 0;//服务端拒绝执行,比如缺参数,或是恶意调用
  6. const SERVICE_NOT_FOUND = -1;//服务端告诉客户端没有该服务
  7. const SERVICE_GROUP_NOT_FOUND = -2;//服务端告诉客户端该服务不存在该服务组(服务控制器)
  8. const SERVICE_ACTION_NOT_FOUND = -3;//服务端告诉客户端没有该action
  9. const SERVICE_ERROR = -4;//服务端告诉客户端服务端出现了错误
  10. const PACKAGE_ENCRYPT_DECODED_ERROR = -5;//服务端告诉客户端发过来的包openssl解密失败
  11. const PACKAGE_DECODE_ERROR = -6;//服务端告诉客户端发过来的包无法成功解码为ServiceCaller
  12. const CLIENT_WAIT_RESPONSE_TIMEOUT = -7;//客户端等待响应超时
  13. const CLIENT_CONNECT_FAIL = -8;//客户端连接到服务端失败
  14. const CLIENT_SERVER_NOT_FOUND = -9;//客户端无法找到该服务
  15. }

默认tcp协议包体规则

  1. [
  2. 'open_length_check' => true,
  3. 'package_length_type' => 'N',
  4. 'package_length_offset' => 0,
  5. 'package_body_offset' => 4,
  6. 'package_max_length' => 1024 * 64
  7. ]

PHP示例代码

  1. $opensslKey = null;
  2. $opensslMethod = 'DES-EDE3';
  3. //构造服务调用
  4. $data = [
  5. 'serviceName'=>'A',//服务名称
  6. 'serviceGroup'=>'G',//服务组(RPC服务控制器名称)
  7. 'serviceAction'=>'index',//服务行为名(RPC服务控制器action名称)
  8. 'args'=>[
  9. 'a'=>1,
  10. 'b'=>2
  11. ]
  12. ];
  13. $fp = stream_socket_client('tcp://127.0.0.1:9502');
  14. //数据打包
  15. $sendStr = json_encode($data,JSON_UNESCAPED_UNICODE|JSON_UNESCAPED_SLASHES);
  16. if($opensslKey){
  17. $sendStr = openssl_encrypt($sendStr,$opensslMethod,$opensslKey);
  18. }
  19. fwrite($fp,pack('N', strlen($sendStr)).$sendStr);
  20. //需要超时机制的请自己用sock time out
  21. $data = fread($fp,65533);
  22. //做长度头部校验
  23. $len = unpack('N',$data);
  24. $data = substr($data,'4');
  25. if(strlen($data) != $len[1]){
  26. echo 'data error';
  27. }else{
  28. if($opensslKey){
  29. $data = openssl_decrypt($data,$opensslMethod,$opensslKey);
  30. }
  31. $json = json_decode($data,true);
  32. //这就是服务端返回的结果,
  33. var_dump($json);
  34. }
  35. fclose($fp);

NodeJs 示例代码

  1. var net = require('net');
  2. var pack = require('php-pack').pack;
  3. var unpack = require('php-pack').unpack;
  4. var json = {
  5. serviceName:'A',
  6. serviceGroup:'G',
  7. serviceAction:'index',
  8. args:[]
  9. };
  10. var send = JSON.stringify(json);
  11. send = Buffer.concat([pack("N",send.length), Buffer.from(send)]);
  12. var client = new net.Socket();
  13. client.connect(9502, '127.0.0.1', function() {
  14. console.log('Connected');
  15. client.write(send);
  16. });
  17. client.on('data', function(data) {
  18. console.log('Received: ' + data);
  19. var ret = JSON.parse(data.toString().substr(4));
  20. console.log('status: ' + ret.status);
  21. client.destroy()
  22. });
  23. client.on('close', function() {
  24. console.log('Connection closed');
  25. });
  26. client.on('error',function (error) {
  27. console.log(error);
  28. });