前面已经提到过sframe将服务器节点抽象成了服务节点。在sframe中,所有的服务必须继承sframe::Service类,才能被注册到sframe::ServiceDispatcher中,被其管理调度。sframe是消息驱动的,所以一个服务运行的本质就是不停地处理各种各样的消息。在一个sframe的进程中,sframe::ServiceDispatcher维护若干个逻辑线程,这些逻辑线程负责执行Service的消息处理逻辑。
一个Service有一个消息队列,保存本服务的待处理消息。ServiceDispatcher中有一个待处理服务队列,保存了有待处理消息的服务。
逻辑线程循环地从ServiceDispatcher的待处理服务队列中取服务,成功取出后,一次性处理完该服务所有的待处理消息。然后重新开始等待队列。同一个服务在队列中必须是唯一的,也就是说若服务A已经在队列中了,若有新的消息到来,A不会再次入队列。这也就保证了同一个服务内的逻辑是线程安全的。
对于一个逻辑服务来说,主要有以下消息需要处理。
1. 服务间消息
也就是服务发送给服务的消息。逻辑服务必须事先注册消息处理函数,然后服务处理消息时,会自动根据注册时的信息,解码消息,然后调用消息处理函数。注册消息处理函数调用
(1) RegistInsideServiceMessageHandler()。注册内部服务消息处理函数,也就是只有本进程的其他服务发的消息才能处理。
(2) RegistNetServiceMessageHandler()。注册网络服务消息处理函数,也就是只有其他进程的服务通过网络发过来的消息才能处理。
(3) RegistServiceMessageHandler()。注册服务消息处理函数,不管是来自本进程的其他服务的消息,或是其他进程的服务的消息,都处理。这也是最常用的,对于一般的业务逻辑来说,不用关心对方服务是否在本进程。
2. 周期定时消息
周期定时器是服务器最常见的功能之一,服务可通过重写GetCyclePeriod()函数,来返回周期时长。然后服务每个周期都会收到周期定时消息,服务通过重写OnCycleTimer()函数来处理周期定时逻辑。
3. 服务销毁消息
在进程关闭时,服务会收到服务销毁消息,表明进程即将关闭,逻辑服务需要立即执行销毁代码。逻辑服务通过重写OnDestroy()来处理销毁消息。
对于一些不能立即销毁的情况,可通过重写IsDestroyCompleted(),返回此时服务是否销毁完成。若销毁未完成,sframe会继续等待,最多等待3秒。
某些时候,销毁服务时可能必须有优先级,也就是说必须等待A销毁完成后才能销毁B。此时可通过重写GetDestroyPriority()来返回服务的销毁优先级,越大越靠后销毁。
4. 新TCP连接建立消息
作为服务器,肯定有需要直接处理TCP连接的时候。比如对于一套分布式服务器来说,网关服务处理与客户端的连接,所以他必须直接处理每一个连接。对此,sframe也有比较好的方案来支持。
ServiceDispatcher有设置自定义监听地址的方法,函数形如:
void SetCustomListenAddr(const std::string & desc_name,conststd::string & ip, uint16_t port, int32_t handle_service);
desc_name,为此地址的描述名称,设置名称便于区分、观察。
ip,监听IP地址
port,监听的端口
handle_service,处理连接的服务ID
通过此方法,可设置自定义监听地址。当该地址有新连接建立时,sframe会给设置时指定的服务(即handle_service)发送连接建立消息。服务通过重写OnNewConnection()函数来处理该消息。
1. 网络
sframe封装了一套简单的跨windows和linux的异步网络库,目前只支持TCP。在windows平台使用完成端口,在linux平台使用epoll。
IoService类封装了IO服务。对于windows来说,它封装了完成端口;对于linux来说它封装了epoll。程序需要开一个线程来循环执行IoService对象的RunOnce()方法。
TcpSocket和TcpAcceptor类都是对socket的封装。TcpSocket处理tcp连接相关业务,TcpAcceptor处理监听TCP连接的业务,新到来一个连接,便创建一个TcpSocket对象。两者都采用监听器的方式来通知外部程序相关的事件,具体请参考源代码。
ServiceDispatcher维护了一个IO线程来处理所有的网络事件。接收了数据或是接受了新的连接后,通过消息发送到逻辑线程。
2. 服务间通信
在sframe中,有一个特殊的服务ProxyService,他的服务ID是0,用以代理远程服务消息,以及维护与远程服务之间的连接。这个服务对用户是不可见的。
sframe启动前,通过ServiceDispatcher的SetServiceListenAddr()函数来设置本进程的远程服务连接监听地址,外部服务通过连接此地址来与本服务通信。通过RegistRemoteService()来注册远程服务地址,本服务通过此地址与远程服务交互。
逻辑服务A若要发送消息到逻辑服务B,调用ServiceDispatcher的SendServiceMsg()方法即可。ServiceDispatcher会判断目标服务B是本地服务还是远程服务。若服务B是本地服务,那么直接将消息压入其消息队列即可;若服务B不是本地服务,便会将消息先发送给ProxyService,ProxyService会查找到对应的连接会话,将消息序列化后发送出去。
服务B收到消息后,根据事先注册好的消息处理函数的信息,对消息进行解码,然后派发给函数。在这个过程中,sframe完全屏蔽了逻辑服务与目标服务的物理机器地址之间的联系。
发送的消息,必须和目标服务支持的格式对应。比如说服务B对某消息的处理函数是void MsgHandler(int, std::string, std::map<int, std::string>);那么服务A在发送消息时,参数必须完全对应,SendServiceMsg(…, int, std::string, std::map<int, std::string>)。否者,将导致服务B解码失败,便不会处理。sframe也支持消息参数为自定义结构体,但是要求在定义结构体时,通过sframe提供的库,对其进行序列化反序列化包装,具体请参考示例代码。