系统组件 - Cluster

优质
小牛编辑
133浏览
2023-12-01

EasySwoole 提供基础的对等模式分布式通讯支持。

知识储备

UDP

什幺是UDP协议

什幺是UDP协议请自行百度。

UDP广播地址

广播地址(Broadcast Address)是专门用于同时向网络中所有工作站进行发送的一个地址。在使用TCP/IP 协议的网络中,主机标识段host ID 为全1 的IP 地址为广播地址,广播的分组传送给host ID段所涉及的所有计算机。例如,对于10.1.1.0 (255.255.255.0 )网段,其广播地址为10.1.1.255 (255 即为2 进制的11111111 ),当发出一个目的地址为10.1.1.255 的分组(封包)时,它将被分发给该网段上的所有计算机。

实现讲解

原理讲解

在开启集群模式的时候,EasySwoole会开启一个UDP子服务,用于集群的UDP信息广播,每当有收到信息,就会自动进行openssl解密,并根据解析后的Message,
执行Message回调容器中与Message Command字段匹配的事件。在BaseService的自定义进程中,每一个信息广播周期,都会执行广播回调容器内的全部事件,而EasySwoole注册了默认的集群节点广播、RPC广播(全部消息都openssl加密)。

核心代码讲解

Cluster.php

  1. <?php
  2. /**
  3. * Created by PhpStorm.
  4. * User: yf
  5. * Date: 2018/1/24
  6. * Time: 下午10:51
  7. */
  8. namespace EasySwoole\Core\Component\Cluster;
  9. use EasySwoole\Config;
  10. use EasySwoole\Core\AbstractInterface\Singleton;
  11. use EasySwoole\Core\Component\Cluster\Callback\BroadcastCallbackContainer;
  12. use EasySwoole\Core\Component\Cluster\Callback\DefaultCallbackName;
  13. use EasySwoole\Core\Component\Cluster\Callback\ShutdownCallBackContainer;
  14. use EasySwoole\Core\Component\Cluster\Common\BaseServiceProcess;
  15. use EasySwoole\Core\Component\Cluster\Common\MessageBean;
  16. use EasySwoole\Core\Component\Cluster\Common\NodeBean;
  17. use EasySwoole\Core\Component\Cluster\Callback\MessageCallbackContainer;
  18. use EasySwoole\Core\Component\Cluster\NetWork\Deliverer;
  19. use EasySwoole\Core\Component\Cluster\NetWork\PacketParser;
  20. use EasySwoole\Core\Component\Openssl;
  21. use EasySwoole\Core\Component\Rpc\Common\ServiceNode;
  22. use EasySwoole\Core\Component\Rpc\Server;
  23. use EasySwoole\Core\Component\Trigger;
  24. use EasySwoole\Core\Socket\Client\Udp;
  25. use EasySwoole\Core\Swoole\EventHelper;
  26. use EasySwoole\Core\Swoole\Memory\TableManager;
  27. use EasySwoole\Core\Swoole\Process\ProcessManager;
  28. use EasySwoole\Core\Swoole\ServerManager;
  29. use Swoole\Table;
  30. class Cluster
  31. {
  32. use Singleton;
  33. private $currentNode;
  34. function __construct()
  35. {
  36. $conf = Config::getInstance()->getConf('CLUSTER');
  37. //加载配置信息,并实例化当前节点
  38. $this->currentNode = new NodeBean($conf);
  39. if($this->currentNode->getEnable() && empty($this->currentNode->getToken())){
  40. Trigger::throwable(new \Exception('cluster token could not be empty and set cluster mode disable automatic'));
  41. $this->currentNode->setEnable(false) ;
  42. }
  43. if($this->currentNode->getEnable() && empty($this->currentNode->getListenAddress())){
  44. Trigger::throwable(new \Exception('cluster listenAddress could not be empty and set cluster mode disable automatic'));
  45. $this->currentNode->setEnable(false) ;
  46. }
  47. //初始化swoole table用于记录节点信息
  48. TableManager::getInstance()->add('ClusterNodeList',[
  49. 'nodeName'=>[
  50. 'type'=>Table::TYPE_STRING,'size'=>20
  51. ],
  52. 'udpAddress'=>[
  53. 'type'=>Table::TYPE_STRING,'size'=>16
  54. ],
  55. 'udpPort'=>[
  56. 'type'=>Table::TYPE_INT,'size'=>10
  57. ],
  58. 'listenPort'=>[
  59. 'type'=>Table::TYPE_STRING,'size'=>10
  60. ],
  61. 'lastBeatBeatTime'=>[
  62. 'type'=>Table::TYPE_INT,'size'=>10
  63. ]
  64. ]);
  65. }
  66. function run()
  67. {
  68. if($this->currentNode->getEnable()){
  69. //注册默认回调
  70. self::registerDefaultCallback();
  71. $name = Config::getInstance()->getConf('SERVER_NAME');
  72. //注册基础服务进程
  73. ProcessManager::getInstance()->addProcess("{$name}_Cluster_BaseService",BaseServiceProcess::class,['currentNode'=>$this->currentNode]);
  74. //开启UDP子服务,
  75. $sub = ServerManager::getInstance()->addServer("{$name}_Cluster",$this->currentNode->getListenPort(),SWOOLE_SOCK_UDP,$this->currentNode->getListenAddress());
  76. $openssl = new Openssl($this->currentNode->getToken());
  77. EventHelper::register($sub,$sub::onPacket,function (\swoole_server $server, string $data, array $client_info)use($openssl){
  78. $data = $openssl->decrypt($data);
  79. $udpClient = new Udp($client_info);
  80. //解析信息包并执行回调
  81. $message = PacketParser::unpack((string)$data,$udpClient);
  82. if($message){
  83. MessageCallbackContainer::getInstance()->hook($message->getCommand(),$message);
  84. }
  85. });
  86. }
  87. }
  88. function currentNode():NodeBean
  89. {
  90. return $this->currentNode;
  91. }
  92. function allNodes():array
  93. {
  94. $ret = [];
  95. $list = TableManager::getInstance()->get('ClusterNodeList');
  96. $time = time();
  97. $ttl = $this->currentNode->getNodeTimeout();
  98. foreach ($list as $key => $item){
  99. $node = new NodeBean([
  100. 'nodeId'=>$key,
  101. 'nodeName'=>$item['nodeName'],
  102. 'lastBeatBeatTime'=>$item['lastBeatBeatTime'],
  103. 'udpInfo'=>[
  104. 'address'=>$item['udpAddress'],
  105. 'port'=>$item['udpPort']
  106. ],
  107. 'listenPort'=>$item['listenPort']
  108. ]);
  109. if($time - $item['lastBeatBeatTime'] > $ttl){
  110. //异常下线
  111. NodeOffLienCallbackContainer::getInstance()->call($node,false);
  112. TableManager::getInstance()->get('ClusterNodeList')->del($key);
  113. }else{
  114. $ret[] = $node;
  115. }
  116. }
  117. return $ret;
  118. }
  119. function getNode($nodeId):?NodeBean
  120. {
  121. $item = TableManager::getInstance()->get('ClusterNodeList')->get($nodeId);
  122. if(is_array($item)){
  123. $ttl = $this->currentNode->getNodeTimeout();
  124. $node = new NodeBean([
  125. 'nodeId'=>$nodeId,
  126. 'nodeName'=>$item['nodeName'],
  127. 'lastBeatBeatTime'=>$item['lastBeatBeatTime'],
  128. 'udpInfo'=>[
  129. 'address'=>$item['udpAddress'],
  130. 'port'=>$item['udpPort']
  131. ],
  132. 'listenPort'=>$item['listenPort']
  133. ]);
  134. if(time() - $item['lastBeatBeatTime'] > $ttl){
  135. NodeOffLienCallbackContainer::getInstance()->call($node,false);
  136. return null;
  137. }else{
  138. return $node;
  139. }
  140. }else{
  141. return null;
  142. }
  143. }
  144. /*
  145. * 注册默认服务
  146. */
  147. private static function registerDefaultCallback()
  148. {
  149. //集群节点广播回调
  150. MessageCallbackContainer::getInstance()->add(DefaultCallbackName::CLUSTER_NODE_BROADCAST,function (MessageBean $messageBean){
  151. $node = $messageBean->getFromNode();
  152. TableManager::getInstance()->get('ClusterNodeList')->set($node->getNodeId(),[
  153. 'nodeName'=>$node->getNodeName(),
  154. 'udpAddress'=>$node->getUdpInfo()->getAddress(),
  155. 'udpPort'=>$node->getUdpInfo()->getPort(),
  156. 'lastBeatBeatTime'=>time(),
  157. 'listenPort'=>$node->getListenPort()
  158. ]);
  159. });
  160. //集群节点广播关机回调
  161. MessageCallbackContainer::getInstance()->add(DefaultCallbackName::CLUSTER_NODE_SHUTDOWN,function (MessageBean $messageBean){
  162. $node = $messageBean->getFromNode();
  163. TableManager::getInstance()->get('ClusterNodeList')->del($node->getNodeId());
  164. //下线该服务的全部rpc服务
  165. Server::getInstance()->serverNodeOffLine($node);
  166. //正常下线
  167. NodeOffLienCallbackContainer::getInstance()->call($node,true);
  168. });
  169. //RPC服务节点广播回调
  170. MessageCallbackContainer::getInstance()->add(DefaultCallbackName::RPC_SERVICE_BROADCAST,function (MessageBean $messageBean){
  171. $node = $messageBean->getFromNode();
  172. $list = $messageBean->getArgs();
  173. foreach ($list as $item){
  174. $serviceNode = new ServiceNode($item);
  175. //可达主机地址即为udp地址(真实地址)
  176. $serviceNode->setAddress($node->getUdpInfo()->getAddress());
  177. Server::getInstance()->updateServiceNode($serviceNode);
  178. }
  179. });
  180. //集群节点广播
  181. BroadcastCallbackContainer::getInstance()->set(DefaultCallbackName::CLUSTER_NODE_BROADCAST,function (){
  182. $message = new MessageBean();
  183. $message->setCommand(DefaultCallbackName::CLUSTER_NODE_BROADCAST);
  184. Deliverer::broadcast($message);
  185. });
  186. //RPC服务广播
  187. BroadcastCallbackContainer::getInstance()->set(DefaultCallbackName::RPC_SERVICE_BROADCAST,function (){
  188. $ret = Server::getInstance()->allLocalServiceNodes();
  189. $data = [];
  190. foreach ($ret as $item){
  191. $data[] = $item->toArray();
  192. }
  193. $message = new MessageBean();
  194. $message->setArgs($data);
  195. $message->setCommand(DefaultCallbackName::RPC_SERVICE_BROADCAST);
  196. Deliverer::broadcast($message);
  197. });
  198. //注册默认集群关机回调
  199. ShutdownCallBackContainer::getInstance()->set(DefaultCallbackName::CLUSTER_NODE_SHUTDOWN,function (){
  200. $message = new MessageBean();
  201. $message->setCommand(DefaultCallbackName::CLUSTER_NODE_SHUTDOWN);
  202. Deliverer::broadcast($message);
  203. });
  204. }
  205. }

BaseServiceProcess.php

  1. namespace EasySwoole\Core\Component\Cluster\Common;
  2. use EasySwoole\Core\Component\Cluster\Callback\BroadcastCallbackContainer;
  3. use EasySwoole\Core\Component\Cluster\Callback\ShutdownCallbackContainer;
  4. use EasySwoole\Core\Swoole\Process\AbstractProcess;
  5. use Swoole\Process;
  6. class BaseServiceProcess extends AbstractProcess
  7. {
  8. public function run(Process $process)
  9. {
  10. // TODO: Implement run() method.
  11. //定时执行广播周期回调事件
  12. $this->addTick($this->getArg('currentNode')->getBroadcastTTL()*1000,function (){
  13. BroadcastCallbackContainer::getInstance()->call();
  14. });
  15. }
  16. public function onShutDown()
  17. {
  18. // TODO: Implement onShutDown() method.
  19. //守护模式的时候,正常关闭服务,会执行该回调
  20. ShutdownCallbackContainer::getInstance()->call();
  21. }
  22. public function onReceive(string $str, ...$args)
  23. {
  24. // TODO: Implement onReceive() method.
  25. }
  26. }

Deliverer.php 可以在任意位置,做UDP信息发送操作

  1. namespace EasySwoole\Core\Component\Cluster\NetWork;
  2. use EasySwoole\Core\Component\Cluster\Cluster;
  3. use EasySwoole\Core\Component\Cluster\Common\MessageBean;
  4. use EasySwoole\Core\Component\Cluster\Common\NodeBean;
  5. class Deliverer
  6. {
  7. /*
  8. * 调用此方法,请确保知晓节点的udp信息
  9. */
  10. public static function toNode(MessageBean $message,NodeBean $node)
  11. {
  12. $message = PacketParser::pack($message);
  13. //端口以监听地址为准,ip地址以udp地址为准
  14. Udp::sendTo($message,$node->getListenPort(),$node->getUdpInfo()->getAddress());
  15. }
  16. public static function toAllNode(MessageBean $message)
  17. {
  18. $message = PacketParser::pack($message);
  19. $nodes = Cluster::getInstance()->allNodes();
  20. foreach ($nodes as $node){
  21. Udp::sendTo($message,$node->getListenPort(),$node->getUdpInfo()->getAddress());
  22. }
  23. }
  24. public static function broadcast(MessageBean $message)
  25. {
  26. $message = PacketParser::pack($message);
  27. $addresses = Cluster::getInstance()->currentNode()->getBroadcastAddress();
  28. foreach ($addresses as $item){
  29. $item = explode(':',$item);
  30. Udp::broadcast($message,$item[1],$item[0]);
  31. }
  32. }
  33. }

常见问题

如何实现跨机房通讯?

假设你在阿里云和腾讯云都各有一台机器,那幺如何实现两台机器互联呢?那幺请注意,EasySwoole允许设置多个广播地址,你可以设置
[ip1:port,ip2:port],注意,此刻ip1,ip2全部为公网ip,且都开放了端口。如果是全部都在同一机房内,那幺仅需向255.255.255.255(默认)广播即可。具体的请以实际的路由网关规则为准。

某个节点异常下线怎幺办

正常情况下,当有节点下线的时候,会触发默认注册的节点下线回调事件,全部节点都会收到节点下线通知,若某台机器意外关机,那幺最大在经过配置项中的nodeTimeout以后,那幺该节点会被清除,并执行回调。

如何注册自己的命令

EasySwoole\Core\Component\Cluster\Callback路径下的全部容器,都是单利模式,主进程中,在EasySwooleEvent.php的主服务创建事件中即可注册。

如何用对等模式实现主从模式功能

假设A-Z台机器,你需要A去收集其他的机器信息,那幺你可以在A中部署独有服务,然后其他机器发送错误的时候,通过RPC,或者是UDP发送的方式,发送给A节点。