当前位置: 首页 > 工具软件 > Skynet > 使用案例 >

skynet

令狐昌胤
2023-12-01
https://github.com/cloudwu/skynet

电商不是有C2B B2C C2C这些个概念么,互动百科的CEO甚至还对抄袭美其名曰:COPY TO CHINA (C2C)。
鸟人今天也为C2B赋予新意:COPY TO BLOG。哈哈
研究skynet的朋友请移步云风的博客,我这里纯属抄袭下来做标记笔记用的,因为个人阅读习惯如此。

http://blog.codingnow.com/2012/08/skynet.html Skynet 开源
这个系统是单进程多线程模型。
每个内部服务的实现,放在独立的动态库中。由动态库导出的三个接口 create init release 来创建出服务的实例。init 可以传递字符串参数来初始化实例。比如用 lua 实现的服务(这里叫 snlua ),可以在初始化时传递启动代码的 lua 文件名。
每个服务都是严格的被动的消息驱动的,以一个统一的 callback 函数的形式交给框架。框架从消息队列里取到消息,调度出接收的服务模块,找到 callback 函数入口,调用它。服务本身在没有被调度时,是不占用任何 CPU 的。
我用多线程模型来实现它。底层有一个线程消息队列,消息由三部分构成:源地址、目的地址、以及数据块。框架启动固定的多条线程,每条工作线程不断的从消息队列取到消息。根据目的地址获得服务对象。当服务正在工作(被锁住)就把消息放到服务自己的私有队列中。否则调用服务的 callback 函数。当 callback 函数运行完后,检查私有队列,并处理完再解锁。
启动新的服务和杀掉服务我把它们做到了框架内,以 skynet command 的形式提供。

Timer 及时间服务是一项基础功能。所以我实现在了框架内。特别是 timeout 为 0 的特例,是不进入 timer 队列,而是直接进入消息队列。这次我提供了 1/1000 秒的时间精度,以及 1/100 精度的 timeout 回调,对于游戏服务感觉是够用了。
对于 MMO 的基础需要, 我提供了 gate 的独立服务,用来处理大量的外部链接。其工作方式是,启动 gate 服务后,根据启动参数, listen 一个端口。接受连接上来的所有外部连接。gate 会为每个连接赋予一个唯一 id 号。注意,这个 id 号是尽量不复用的。
gate 会默认将所有外部连接的相关消息(连入,退出,有数据到来)发送给一个叫 watchdog 的服务。

在目前的范例中,watchdog 用 lua 实现。当一个外部连接接入,它会启动一个类型为 agent 的服务(也是用 lua 编写),并通知 gate 绑定这个外部连接的数据到新启动的 agent 上

http://blog.codingnow.com/2012/08/skynet_harbor_rpc.html Skynet 集群及 RPC
我用 32bit 的 id 来标识 skynet 上的服务节点。其中高 8 位是机器标识,低 24 位是同一台机器上的服务节点 id。
我设计了一台 master 中心服务器用来同步机器信息。把每个 skynet 进程上用于和其他机器通讯的部件称为 Harbor。每个 skynet 进程有一个 harbor id 为 1 到 255 (保留 0 给系统内部用)。在每个 skynet 进程启动时,向 master 机器汇报自己的 harbor id 。一旦冲突,则禁止连入。
master:
    master 服务其实就是一个简单的内存 key-value 数据库。数字 key 对应的 value 正是 harbor 的通讯地址。另外,支持了拥有全局名字的服务,也依靠 master 机器同步。比如,你可以从某台 skynet 节点注册一个叫 DATABASE 的服务节点,它只要将 DATABASE 和节点 id 的对应关系通知 master 机器,就可以依靠 master 机器同步给所有注册入网络的 skynet 节点。
    master 做的事情很简单,其实就是回应名字的查询,以及在更新名字后,同步给网络中所有的机器。
harbor:
    skynet 节点,通过 master ,认识网络中所有其它 skynet 节点。它们相互一一建立单向通讯通道。也就是说,如果一共有 100 个 skynet 节点,在它们启动完毕后,会建立起 1 万条通讯通道。
为了区分请求包和回应包。约定,请求包的 session id 为负数,回应包的 session id 为正数。不需要回应的包,可以用 0 做 session id 。
利用 coroutine ,做远程请求时,记录下产生的 session id ,yield 出来,把线程挂起;待到收到携带有相同 session id 的反馈包,把挂起的线程唤醒即可。

http://blog.codingnow.com/2012/08/skynet_dev.html Skynet 的一些改进和进展
增加了一个对外建立连接的服务 (connection) 。 建立起连接上,就让这个服务把所有外部连接上的数据推送到指定节点。
无论是 gate 的接收外部连接, 还是 connection 发起外部连接,处理 skynet 系统外的数据,都和内部数据包的流通有所不同。因为,外部包是可以组合成任何内部布局并加以处理的(可以有指针),而外部数据则必须是连续数据块,并有一定的包分割规则
认清这个不同点,我就增加了 内部和外部数据的识别方式(之前的版本是认为可以隐藏掉这个细节的)。 我的方法是用了一个特殊的 session 号 (0x7fffffff) 。因为 session id 是由 skynet 自己递增产生的,所以不会有冲突

http://blog.codingnow.com/2012/08/skynet_bug.html 记录一个并发引起的 bug
Skynet 的消息分发是这样做的:
所有的服务对象叫做 ctx ,是一个 C 结构。每个 ctx 拥有一个唯一的 handle 是一个整数
每个 ctx 有一个私有的消息队列 mq ,当一个本地消息产生时,消息内记录的是接收者的 handle ,skynet 利用 handle 查到 ctx ,并把消息压入 ctx 的 mq 。
ctx 可以被 skynet 清除。为了可以安全的清除,这里对 ctx 做了线程安全的引用计数。每次从 handle 获取对应的 ctx 时,都会对其计数加一,保证不会在操作 ctx 时,没有人释放 ctx 对象。
skynet 维护了一个全局队列,globalmq ,里面保存了若干 ctx 的 mq 。
这里为了效率起见(因为有大量的 ctx 大多数时间是没有消息要处理的),mq 为空时,尽量不放在 globalmq 里,防止 cpu 空转。
Skynet 开启了若干工作线程,不断的从 globalmq 里取出二级 mq 。我们需要保证,一个 ctx 的消息处理不被并发。所以,当一个工作线程从 globalmq 取出一个 mq ,在处理完成前,不会将它压回 globalmq 。
处理过程就是从 mq 中弹出一个消息,调用 ctx 的回调函数,然后再将 mq 压回 globalmq 。这里不把 mq 中所有消息处理完,是为了公平,不让一个 ctx 占用所有的 cpu 时间。当发现 mq 为空时,则放弃压回操作,节约 cpu 时间。
所以,产生消息的时刻,就需要执行一个逻辑:如果对应的 mq 不在 globalmq 中,把它置入 globalmq 。
需要考虑的另一个问题是 ctx 的初始化过程:
ctx 的初始化流程是可以发送消息出去的(同时也可以接收到消息),但在初始化流程完成前,接收到的消息都必须缓存在 mq 中,不能处理。我用了个小技巧解决这个问题。就是在初始化流程开始前,假装 mq 在 globalmq 中(这是由 mq 中一个标记位决定的)。这样,向它发送消息,并不会把它的 mq 压入 globalmq ,自然也不会被工作线程取到。等初始化流程结束,在强制把 mq 压入 globalmq (无论是否为空)。即使初始化失败也要进行这个操作。
当 ctx 销毁前,由它向其 mq 设入一个清理标记。然后在 globalmq 取出 mq ,发现已经找不到 handle 对应的 ctx 时,先判断是否有清理标记。如果没有,再将 mq 重放进 globalmq ,直到清理标记有效,在销毁 mq 。

http://blog.codingnow.com/2012/09/the_design_of_skynet.html Skynet 设计综述
int skynet_send(
  struct skynet_context * context,
  uint32_t source,
  uint32_t destination,
  int type,
  int session,
  void * msg,
  size_t sz
);
typedef int (*skynet_cb)(
  struct skynet_context * context,
  void *ud,
  int type,
  int session,
  uint32_t source ,
  const void * msg,
  size_t sz
);
作为核心功能,Skynet 仅解决一个问题:
     把一个符合规范的 C 模块,从动态库(so 文件)中启动起来,绑定一个永不重复(即使模块退出)的数字 id 做为其 handle 。模块被称为服务(Service),服务间可以自由发送消息。每个模块可以向 Skynet 框架注册一个 callback 函数,用来接收发给它的消息。每个服务都是被一个个消息包驱动,当没有包到来的时候,它们就会处于挂起状态,对 CPU 资源零消耗。如果需要自主逻辑,则可以利用 Skynet 系统提供的 timeout 消息,定期触发。
Skynet 提供了名字服务,还可以给特定的服务起一个易读的名字,而不是用 id 来指代它。id 和运行时态相关,无法保证每次启动服务,都有一致的 id ,但名字可以。
发送一个数据包,就是发送 msg/sz 对。我们可以在 type 里打上 dontcopy 的 tag (PTYPE_TAG_DONTCOPY) ,让框架不要复制 msg/sz 指代的数据包。否则 skynet 会用 malloc 分配一块内存,把数据复制进去。callback 函数在处理完这块数据后,会调用 free 释放内存。你可以通过让 callback 返回 1 ,阻止框架释放内存。这通常和在 send 时标记 dontcopy 标记配对使用。
session:
    使用 skynet_send 发送一个包的时候,你可以在 type 里设上 alloc session 的 tag (PTYPE_TAG_ALLOCSESSION)。send api 就会忽略掉传入的 session 参数,而会分配出一个当前服务从来没有使用过的 session 号,发送出去。同时约定,接收方在处理完这个消息后,把这个 session 原样发送回来。这样,编写服务的人只需要在 callback 函数里记录下所有待返回的 session 表,就可以在收到每个消息后,正确的调用对应的处理函数。session在skynet_context(即每个服务)内部保持唯一性即可。
type:
    type 表示的是当前消息包的协议组别,而不是传统意义上的消息类别编号。协议组别类型并不会很多,所以,我限制了 type 的范围是 0 到 255 ,由一个字节标识。 在实现时,我把 type 编码到了 size 参数的高 8 位。因为单个消息包限制长度在 1.6 M (24 bit)内,是个合理的限制。这样,为每个消息增加了 type 字段,并没有额外增加内存上的开销。
    查看 skynet.h 我们可以看到已经定义出来的消息类型:
        #define PTYPE_TEXT 0
        #define PTYPE_RESPONSE 1
        #define PTYPE_MULTICAST 2
        #define PTYPE_CLIENT 3
        #define PTYPE_SYSTEM 4
        #define PTYPE_HARBOR 5
        #define PTYPE_TAG_DONTCOPY 0x10000
        #define PTYPE_TAG_ALLOCSESSION 0x20000
    0 是内部服务最为常用的文本消息类型。 1 表示这是一个回应包,应该依据对方的规范来解码。后面定义出来的几种类型暂时不解释,我们也可以自定义更多的类型,比如在目前的版本中,lua 层面还定义出了一些专用于 lua state 间通讯的消息编码类型。
集群
    单个 skynet 进程内的服务数量被 handle 数量限制。handle 也就是每个服务的地址,在接口上看用的是一个 32 位整数。但实际上单个服务中 handle 的最终限制在 24bit 内,也就是 1.6M 个。高 8 位是保留给集群间通讯用的。
    我们最终允许 255 个 skynet 节点部署在不同的机器上协作。每个 skynet 节点有不同的 id 。这里被称为 harbor id 。这个是独立指定,人为管理分配的(也可以写一个中央服务协调分配)。每个消息包产生的时候,skynet 框架会把自己的 harbor id 编码到源地址的高 8 位。这样,系统内所有的服务模块,都有不同的地址了。从数字地址,可以轻易识别出,这个消息是远程消息,还是本地消息。
    这也是 skynet 核心层做的事情,核心并不解决远程数据交互的工作。
     集群间的通讯,是由一个独立的 harbor 服务来完成的。所有的消息包在发送时,skynet 识别出这是一个远程消息包时,都会把它转发到 harbor 服务内。harbor 服务会建立 tcp 连接到所有它认识的其它 skynet 节点内的 harbor 服务上。
    Harbor 间通过单向的 tcp 连接管道传输数据,完成不同的 skynet 节点间的数据交换。
     skynet 目前支持一个全局名字服务,可以把一个消息包发送到特定名字的服务上。这个服务不必存在于当前 skynet 节点中。这样,我们就需要一个机构能够同步这些全局名字。
     为此,我实现了一个叫做 master 的服务。它的作用就是广播同步所有的全局名字,以及加入进来的 skynet 节点的地址。本质上,这些地址也是一种名字。同样可以用 key-value 的形式储存。即,每个 skynet 节点号对应一个字符串的地址。
组播
    skynet 会识别消息的 type 是否为 PTYPE_MULTICAST ,然后有不同的生命期管理策略,并把组播包交给组播服务处理。这一点,和集群间通讯的做法非常类似。
    组播服务并不解决分属在不同集群节点上的服务归组的问题。即,每个分组内的成员都必须在同一系统进程内。这可以极大的简化设计。用户可以让不同的服务 handle 归属一个组号。向 skynet 索取这个组号对应的 handle 。向这个组的 handle 发送消息,就等同于向组内所有 handle 发送消息。
    而跨集群分组又如何做到呢?这里是在上层用 lua 来做了进一步的封装。
    首先,提供了一个简单的,用 C 编写的服务,叫做 tunnel 。它可以把发送给它的消息,无条件的转发到另一个 handle 上。这个转发 handle 可以是在不同 skynet 节点上的。
    我用 lua 编写了一个全局的分组管理器,协调在不同节点上,创建出相同组名的分组来。然后用 tunnel 服务连接不同节点上的同一分组就够了。
Skynet 的核心功能就是发送消息和处理消息。它体现在 skynet_send 和 skynet_callback 两个 api 上。原本我并不打算把名字服务放在底层,但由于历史原因,增加了 skynet_sendname 这个 api 。数字地址也有一个字符串名字,以 : 开头,跟上 8 字符的 16 进制的数字串标识。同一节点内的服务地址用 . 开头,而全局名字则是其它的字符串。为了代码和协议简洁,做了一些小限制:全局名字不可以超过 16 个字符。
skynet 提供了一个叫做 skynet_command 的 C API ,作为基础服务的统一入口。它接收一个字符串参数,返回一个字符串结果。你可以看成是一种文本协议。但 skynet_command 保证在调用过程中,不会切出当前的服务线程,导致状态改变的不可预知性。其每个功能的实现,其实也是内嵌在 skynet 的源代码中,相同上层服务,还是比较高效的。(因为可以访问许多内存 api ,而不必用消息通讯的方式实现)
消息调度
    Skynet 维护了两级消息队列。
    每个服务实体有一个私有的消息队列,队列中是一个个发送给它的消息。消息由四部分构成:
    struct skynet_message {
        uint32_t source;
        int session;
        void * data;
        size_t sz;
    };
    向一个服务发送一个消息,就是把这样一个消息体压入这个服务的私有消息队列中。这个结构的值复制进消息队列的,但消息内容本身不做复制。
    Skynet 维护了一个全局消息队列,里面放的是诸个不为空的次级消息队列。
    在 Skynet 启动时,建立了若干工作线程(数量可配置),它们不断的从主消息列队中取出一个次级消息队列来,再从次级队列中取去一条消息,调用对应的服务的 callback 函数进行出来。 为了调用公平,一次仅处理一条消息,而不是耗净所有消息(虽然那样的局部效率更高,因为减少了查询服务实体的次数,以及主消息队列进出的次数),这样可以保证没有服务会被饿死。
     用户定义的 callback 函数不必保证线程安全,因为在 callback 函数被调用的过程中,其它工作线程没有可能获得这个 callback 函数所熟服务的次级消息队列,也就不可能被并发了。一旦一个服务的消息队列暂时为空,它的消息队列就不再被放会全局消息队列了。这样使大部分不工作的服务不会空转 CPU
Gate
     gate 服务。它的特征是监听一个 TCP 端口,接受连入的 TCP 连接,并把连接上获得的数据转发到 skynet 内部。Gate 可以用来消除外部数据包和 skynet 内部消息包的不一致性。外部 TCP 流的分包问题,是 Gate 实现上的约定。我实现了一个 gate 服务,它按两字节的大头字节序来表示一个分包长度。这个模块基于我前段时间的一个子项目 。理论上我可以实现的更为通用,可以支持可配置的分包方案(Erlang 在这方面做的很全面)。但我更想保持代码的精简。固然,如果用 skynet 去实现一个通用的 web server ,这个 gate 就不太合适了。但重写一个定制的 Gate 服务并不困难。为 web server 定制一个 gate 甚至更简单,因为不再需要分包了。
     Gate 会接受外部连接,并把连接相关信息转发给另一个服务去处理。它自己不做数据处理是因为我们需要保持 gate 实现的简洁高效。C 语言足以胜任这项工作。而包处理工作则和业务逻辑精密相关,我们可以用 Lua 完成。
    外部信息分两类,一类是连接本身的接入和断开消息,另一类是连接上的数据包。一开始,Gate 无条件转发这两类消息到同一个处理服务。但对于连接数据包,添加一个包头无疑有性能上的开销。所以 Gate 还接收另一种工作模式:把每个不同连接上的数据包转发给不同的独立服务上。每个独立服务处理单一连接上的数据包。
    或者,我们也可以选择把不同连接上的数据包从控制信息包(建立/断开连接)中分离开,但不区分不同连接而转发给同一数据处理服务(对数据来源不敏感,只对数据内容敏感的场合)。
     这三种模式,我分别称为 watchdog 模式,由 gate 加上包头,同时处理控制信息和数据信息的所有数据;agent 模式,让每个 agent 处理独立连接;以及 broker 模式,由一个 broker 服务处理不同连接上的所有数据包。无论是哪种模式,控制信息都是交给 watchdog 去处理的,而数据包如果不发给 watchdog 而是发送给 agent 或 broker 的话,则不会有额外的数据头(也减少了数据拷贝)。识别这些包是从外部发送进来的方法是检查消息包的类型是否为 PTYPE_CLIENT 。当然,你也可以自己定制消息类型让 gate 通知你。
    Skynet 的基础服务中,关于集群间通讯的那部分,已经采用了 gate 模块作为实现的一部分。但是 gate 模块是一个纯粹的 skynet 服务组件,仅使用了 skynet 对外的 api ,而没有涉及 skynet 内部的任何细节。在 Harbor 模块使用 gate 时,启用的 broker 模块,且定制了消息包类型为 PTYPE_HARBOR 。
    在开源项目的示范代码中,我们还启动了一个简单的 gate 服务,以及对应的 watchdog 和 agent 。可以用附带的 client 程序连接上去,通过文本协议和 skynet 进行交流。agent 会转发所有的 client 输入给 skynet 内部的 simpledb 服务,simpledb 是一个简易的 key-value 内存数据库。这样,从 client 就可以做基本的数据库查询和更新操作了。
    注意, Gate 只负责读取外部数据,但不负责回写。也就是说,向这些连接发送数据不是它的职责范畴。 作为示范,skynet 开源项目实现了一个简单的回写代理服务,叫做 service_client 。启动这个服务,启动时绑定一个 fd ,发送给这个服务的消息包,都会被加上两字节的长度包头,写给对应的 fd。根据不同的分包协议,可以自己定制不同的 client 服务来解决向外部连接发送数据的模块。
Connection
    和 Gate 不同,它负责从 skynet 内部建立 socket 到外部服务。
     Connection 分两个部分,一部分用于监听不同的系统 fd 的可读状态,这是用 epoll 实现的。如果在没有 epoll 支持的环境(比如 freebsd 下),可以很轻松的实现一个替代品。 它收到这个连接上的数据后,会把所有数据不做任何分包,转发到另一个服务里去处理。这和 gate 的行为不太一致,这是因为 connection 多用于使用外部第三方数据库,我们很难统一其分包的格式。
     另一部分是 Lua 相关的底层支持库,可以用于建立连接,以及对连接上数据常用的分包规则。
    我试着编写了 Redis 的支持模块。Redis 采用的文本协议,比较好解析,也就容易说明这个东西的用法。我们可以从 redis 支持模块中利用 connection 服务监视一个 tcp socket ,一旦收到任何数据就发送回来。这就不必在服务中阻塞读取外部 TCP 连接。任何 skynet 内部服务都不建议阻塞使用外部 IO 的,那会造成 CPU 的浪费。
    通过消息包的 type ,我们可以轻易识别出那些包是外部 tcp 连接上的数据块。使用提供好的 lua 模块,可以轻松的对这些数据分包(读一个指定字节数的数据块,或是读一个以回车结束的文本行)。Lua 的 coroutine 支持,可以轻松的在数据包并不完整时挂起,却不打断执行流程。
     另一个使用 Connection 模块的例子是 console 服务。skynet 的开源部分实现了一个简单的 Console 模块,可以读取进程的标准输入,按回车分割,并以用户输入的文本行去启动一个 lua 编写的服务。代码很短,很容易理解其工作方式。
lua 层的设计
    Lua 是 skynet 的标准配置,它不是必须的,但实际上被用在很多部分了。虽然完全可以用另一种语言比如 python 来替代掉 lua ,但我没有这个开发计划。
    在 Lua 的底层,skynet 封装了 skynet 最基本的 C API 。但是开发人员不必工作在这些底层 API 上,以 C 语言的思维来编写服务。
    Lua 的 coroutine 可以帮助我们把一个个在 C 层面分离的 callback 调用串成逻辑上连续的线索。当 Lua 编写的服务接收到一个外部请求时,对应的底层 callback 函数被调用,既而转发到 Lua 虚拟机中。skynet 的 lua 层会为每个请求创建一个独立的 coroutine 。
    一旦在处理这个请求的 coroutine 中发生远程调用,即发出一个消息包,coroutine 会挂起。在 C 层面,这次 callback 函数正常返回了。但在 Lua 中,则是记录下这个发出的消息包的 session ,记录 session 和挂起的 corutine 在一张对应表中。之后,一但收到回应包里有相同的 session ,对应的 coroutine 则被唤醒 resume 。
    每个服务可以使用不同的协议组,则是在底层由 type 参数区分的。在 lua 层,可以为每个不同的 type 编写不同的 dispatch 函数。默认仅提供了 RESPONSE 消息的处理方法,每个独立的 lua 服务,都需要去实现自己可以支持的协议类型的处理函数。
    比如,我已经提供了一种远程对象的支持方法 ,你可以选择使用它,也可以选择不用。如果你想用它,只需要 require 对应的 lua 模块,其处理函数就被自动注入了消息分发器中。
     大多数 lua 服务也可以使用一种简易的消息编码协议,我称为 lua 协议。因为它仅仅是简单的把 Lua 支持的类型系列化起来,另一个 Lua 服务可以顺利的解开它们。这样,看起来请求(用 Lua 编写的)远程服务和调用本地函数一样简单。
    如果考虑到跨语言兼容性的话,也可以使用文本协议。从 C 服务去请求一个 Lua 服务处理起来可能更简单。当然用的更多的是反过来的场合,用 Lua 去调用 C 编写的一些基础服务。若是使用方便 Lua 规范的编码方式,C 服务的编写就会相对复杂。而空格分割的字符串参数,C 语言则可以用 sscanf 轻松分割开。
comments:
    Q:
        大神, 求解以下代码块.实在是不明白.
        struct lua_State *L = luaL_newstate();
        luaL_openlibs(L); // link lua lib
        lua_close(L);
        L = luaL_newstate();
    A:
        如果不主动调用一下
        luaL_openlibs
        编译器可能没有把 lua 的标准库中的符号链接到执行文件中。(因为没有使用过)
        这样做有没有实际意义取决于你系统中的 lua 库是静态库还是动态库。但无论怎样,调用一下也没什么副作用。
    
    Q:
        我发现了一个问题,感觉像是bug:往lua脚本里里面压入buffer(一个lightuserdata数据),然后脚本再次send这个lightuserdata到另外一个lua脚本,会造成double free。
        因为往lua里面压lightuserdata,会使用dontcopy,不再malloc新的msg指针,但是返回却固定为0,dispatch后一定会free。
        没有其他人发现吗。。。
    A:
        在 lua 里直接使用 C 指针(lightuserdata) 属于危险行为, 或是 low level api.
        只有在需要性能且明确使用流程的时候才可以使用,否则一律用 string 传递消息.
        另外,发送 buffer 必须严格满足生产消费模式. 即发送(生产完毕)后,发送者就失去了对发送内存的访问权. 逻辑上, 发送一个 buffer 等于 free 了这个 buffer
        
    Q:
        还是没搞懂PTYPE_CLIENT 这个类型的数据是哪里来的(什么样的外部数据?)。
        我用gdb跟踪,新client的连接和后续client的数据都是PTYPE_SOCKET类型的
    A:
        PTYPE_CLIENT 是历史遗留的. 原本 gate 服务是先于现在的 socket 模块实现的.
        PTYPE_CLIENT 是 gate 给外部连接定义出来的消息类型. 有了 socket 模块后, gate 不是必须品, PTYPE_CLIENT 这类消息也不一定有了.
        gate 现在已经不是核心组件. 建议直接用 socket 库编写你需要的业务.

        现有 gate 的功能,可以通过阅读 gate 的源码了解.


http://blog.codingnow.com/2013/08/exit_skynet.html 如何安全的退出 skynet
Skynet 在最开始设计的时候是没有仔细考虑系统退出的问题的,后来为了检测内存泄露的问题,加入了一个 ABORT 指令可以杀掉全部活着的服务。
安全的退出整个系统,尤其是一个分布式系统,总是一个复杂的问题。我们的内部版为这些做了大量的工作。但我觉得做的不太干净,所以一直没有把代码合并到开源版。
今天晓靖又重提这个退出系统的方案,就又把这个问题拿出来讨论了一下。我的个人观点是,如何安全的退出和业务逻辑相关性很强,在框架中加入大量的通知机制让每个服务自己协调解决即增加了框架的复杂度,又不降低系统的设计难度。我们目前内部版本中增加的服务退出消息以及服务优先级等等这些机构过于复杂,且没有很好的解决问题。
曾经我想过按操作系统的做法,杀掉一个服务先发送一个消息给这个服务,让服务有一小段时间可以做最好的事务处理。等超时后,就强行把服务杀掉。这样显得不够安全,但若等服务自行干净退出又有可能发生死锁。我觉得,对于整个系统都是自己设计构建的话,其实设计人员是能够理解如何安全的关闭系统的。只有小部分服务需要做善后工作,比如数据库读写层需要把尚未写完的数据写入数据库后才能退出;大部分服务是无状态的,它们可以直接清理;还有一部分服务的关闭过程比较复杂,比如网关,就需要先关闭监听端口,再逐个关闭客户链接,最后才能退出自己。
让每个服务自己收到退出消息后想办法处理好退出流程,不如有一个专门掌管系统关闭的服务来统一协调系统关闭退出的流程。目前的 launcher 服务掌管了大部分服务的启动流程,可以稍加改进来管理退出过程。也可以把这一部分业务独立实现。无论怎样,都不要在框架底层来做太多事情。每个有复杂退出流程的服务,应在启动时把自己上报,退出管理器了解所有系统中活跃的服务,按系统的整体设计来决定退出的时候按怎样的次序做好哪些事情。
框架可以做一点点事情,能让这件事情做起来简洁一点。那就是可以指定一个服务可以监听到其它服务的关闭事件,而不需要服务在退出的时候自行汇报。
我没有在框架层把这个服务退出机制设计成 Erlang 那样的 process link 的方式是因为,n:m 的 process link 会使得管理连接的模块实现的过于复杂。用单点接收所有的服务退出消息,同样也可以在这个单点上进一步实现出 Erlang 那样的 process link 机制来。
目前的接口是放在 skynet.lua 里的 skynet.monitor 方法。在启动脚本里调用它启动一个监听服务。我编写了一个简单的监听服务 simplemonitor.lua ,在每个服务退出时输出一行 log 。用户可以改进这个服务做更多的事情。
        
http://blog.codingnow.com/2013/12/skynet_monitor.html Skynet 的服务监控及远程调用
基于 Actor 模式的框架,比较难解决的问题是当一个 actor 异常退出后如何善后的问题。
Erlang 的做法简单粗暴,它提供了 spawn_link 方法 , 当一个 process (Erlang 的 Actor) 退出后,可以把和它关联的 process 也同时退出。
在 skynet 中,一开始我并不想在底层解决这个问题。我希望所有的 service 都是稳定的。如果一个 service 可能中途退出,那么在上层协调好这个关系。
而且 skynet 借助 lua 的 coroutine 机制,事实上在同一个 lua service 里跑着多个 actor 。一个 lua coroutine 才是一个 actor 。粗暴的将几个 service 在底层绑定生命期不太合适。
但是,如果有 service 有中途退出的可能,那么利用 skynet.call 调用上面的远程方法就变得不可靠。简单的用 timeout 来解决我认为只是回避了问题,而且会带来更多的复杂性。这是我在设计 skynet 时就想避免的。所以我在处理 service 生命期监控的问题上,做的比较谨慎。
前段时间我在 skynet 底层加上了 monitor 机制,可以把服务退出的消息从框架层抛出来,让上层逻辑可以感知到。但我并不想固定处理服务退出事件的逻辑,所以加了一个 skynet.monitor 方法把一个特殊服务注册入框架,让它全权负责处理这类消息。
btw, 这里我刻意回避了使用命名服务的机制,没有把 monitor 起一个特定名字。(例如:早期我就给 launcher 起了名字。)这是因为我觉得具名服务不应该在框架底层实现,而应该放在上层机制中。目前 skynet 支持的名字服务仅仅是出于历史兼容原因存在。新增加的代码我也不想再依赖这个特性了。
同时,我写了一个简单的 monitor 做为示范。在我们自己的项目中,对应的服务逻辑要复杂的多。
显然,光有 monitor 机制仅仅提供了解决问题的可能,把 skynet.call 做的更完备是不够的。今天的 skynet 更新 patch 中,我尝试完善了一下。
首先,增加了 skynet.watch 这个 api ,用于显式关注一些可能不稳定的服务。我不想给原有的 skynet.call 加太多的负担,毕竟大多数服务都是稳定不会退出的。(一旦意外退出,整个系统都会不可用,基于保持运行也意义不大)
skynet.watch 会向 monitor 汇报,当 monitor 发现服务消失的那一刻,将发消息通知。
skynet.call 在调用一个被 watch 的服务上的方法时,把 session/service 记录在一张表里。当收到 monitor 的反馈消息时,将从表中找到需要恢复的 coroutine ,把它唤醒并抛出异常。这样,skynet.call 就不会因为调用服务在处理远程请求过程中异常退出而无法返回的情况了。
目前还没有实现异常传递的机制。也就是当一个 coroutine 发生异常,而它又是被远程调用的方法,那么最好是能把这个异常传递回去。这套机制在我们的项目中有实现,但我还没有想好怎样合并到 github 的 skynet 核心代码树上。
12 月 10 日补充:
想了一下, 发现异常传播可以利用上面的通道进行,只需要增加几行代码即可. 所以今天的 patch 把这个功能加上了。这样,一但 skynet.call 调用的远程方法发生了异常后,调用者这边也会同样抛出一个异常,而不是挂起。

       
http://blog.codingnow.com/2014/04/skynet_gate_lua_version.html 对 skynet 的 gate 服务的重构
由于历史原因,skynet 中的 gate 服务最早是用 C 写的独立服务。后来 skynet 将 socket 的管理模块加入核心后又经历过一次重构,用后来增加的 socket api 重新编写了一遍。
目前,skynet 的各个基础设施逐步完善,并确定了以 lua 开发为主的基调,所以是时候用 lua 重写这个服务了。
如果是少量的连接且不关心性能的话,直接用 skynet 的 lua socket 库即可。这里有一个例子(https://github.com/cloudwu/skynet/blob/master/test/testsocket.lua)。
gate 定位于高效管理大量的外部 tcp 长连接。它不是 skynet 的核心组件,但对于网络游戏业务,必不可少。
skynet 的内核已经集成了 epoll/kqueue 可以高效处理 socket 事件。但是离处理长连接还差一步,那就是对数据流的分包。
skynet 目前的 socket api ,采用回调的方式接收 socket 数据流。在一条 tcp 连接上,无论每次收到多少字节,都会使用 PTYPE_SOCKET 通道转发给绑定这条 tcp 连接的 skynet 服务。它是不关心数据流是如何组织的。
通常,我们会用 长度+数据内容 的形式对 TCP 数据流进行切分。这是在网络游戏中最常见的协议设计方案。
当然,也可以按 http 或其它流行网络协议(pop3 imap 等)那样,以回车换行符以及文本数字的方式来分包,但除了增加切包算法的复杂度外,没有太多好处。
目前 skynet 的 gate 服务约定的协议是,2 字节( 大头编码)表示一个 64K 字节内的数据包,然后接下来就是这个长度的字节数。我曾经考虑过使用 4 字节或 google proto buffer 用的 varint ,但最后都放弃了。
考虑到实现的便捷,通常收到长度后,会在内存考虑指定长度的 buffer 等待后续的数据输入。这样,如果有大量攻击者发送超长包头,就会让服务器内存瞬间消进。所以,这种协议只要实现的不小心,很容易变成攻击弱点。
注:skynet 最早期的 gate 实现反而没有这个问题。因为它使用了单一的 ringbuffer ,只发送包头却不发送数据的连接会在 ringbuffer 回绕的时候被踢掉。
游戏服务器如果只使用一条 TCP 长连接的情况下,单个数据包过大(> 64K),也是不合适的。大包会阻塞应用逻辑(收取和发送它们都需要很长的时间),如果在应用层有心跳控制的话,也很容易造成心跳超时。所以一般在应用层对大数据包再做上层协议的切割处理。在本文的最后,会对此做一些讨论。
gate 的职责应该是保持大量 TCP 长连接,按协议切分。对于不完整的数据包,按连接分别置入独立的缓冲区中。对于每个完整的包,转发给需要的服务。
这里的工作分两部分,分包和转发。
分包以及对不完整的包做缓存是个细活,交个 C 代码去处理比较合适。但转发控制这部分业务比较复杂,lua 做更好。这就是这次重构的指导思想。

这次我把分包的工作放在了一个叫做 netpack 的 lua 扩展库里。参考:lua-netpack.c
然后 gate 的调度逻辑放在了 lua 版的 gate.lua 中。相较于上一版完全用 C 实现,会损失一点性能,但扩展性和可维护性都能提高很多。
最初在设计 gate 的时候,希望可以把多个连接上的数据转发给一个服务处理。比如你想用一个认证服务处理所有连接的最初登陆流程。又不想对网络数据包再打包(加上连接号),因为这样会造成额外的开销。
为了让接受数据的服务区分不同的数据源,我制作了一个代理服务 service_client 用来发送数据。并且在转发的时候,伪造这个代理服务作为数据源(真实的数据来源于 TCP 连接)。这样,处理数据的服务只需要按来源回应数据包就可以让网络数据包正确的返回。
而且,这种做法可以将 TCP 连接上的数据包通过 skynet.filter 包装成 skynet 内部消息格式。即,收到网络数据包 (PTYPE_CLIENT) 时,数据包内其实包含有必要的 session 号,先把 session 和其他数据分离,通过 skynet.filter 分别传给下游。这样就把网络连接从业务层中屏蔽掉了。

这也是为什么 gate 的转发协议需要提供两个服务地址的原因。
这次新写的 gate 继承了这个用法。但过往的实践中,这个用法略显复杂。如果业务简单,其实用不着实现这么多配套服务。直接把 socket fd 交给业务处理的服务,它直接向 socket 发包即可。
重新写的 examples 里的 agent 就按这个思路实现。
examples 展示了简单的客户端服务器通讯协议的封装方法:消息主体使用 json 。在主体前面加上文本的 数字 session 加一个符号 + 或 - 。+ 表示这是一条请求消息,- 表示这条消息是对前面对应 session 号的消息的回应。
由于直接调用 socket api 发送数据包,所以 agent 不再需要和 service_client 配套使用。
原来 C 编写的 client demo 已经删除,换成了 lua 版的 client demo 。网络层使用了一个简单的 socket 库。如果用于实际项目,还需要完善 socket 库(客户端也不一定用 lua 实现)。
这次 examples 只是简单的重新理了一下代码。它还远远不是一个有复杂业务的 demo 。我一直在考虑到底提供一个怎样的 demo 可以完整的展示 skynet 的特点。目前还没有想好。
对于我们已经上线和即将上线的项目,结构比这个 example 要复杂的多。
首先我们使用的是 google proto buffer 协议,作为消息主体。但外围做了一些封装。消息由消息类型、session 消息主体构成。消息类型用自定义的小语言描述,用于消息分发。session 对应 skynet 内部的 session 号。根据消息类型,可以知道消息主体如何编码。
其次,每个连接建立后,不会立刻创建 agent 和它对接,因为这可能使登陆流程(尤其是未完成的登陆流程)给系统造成过大的负载。登陆过程的交互统一转给认证服务处理。认证服务是无状态的,所以可以在系统内启动多份以提高处理效率。认证结束后,才真正创建 agent 和连接对接。
创建 agent 的过程可能比较慢 ,所以我们会在服务启动的时候预先创建好数千个 agent 待用。有一个 agent pool 服务管理这些备用 agent 。一旦系统有空闲,就会不断补充备用。
我们在 TCP 连接做了进一步的加密处理,目前是对 gate 做了一些改造完成的。由于 gate 并非 skynet 核心模块,所以可以复制一份出来定制需要的功能。将来还希望加上断线重连的特性。
下一步,我希望给 skynet 的 socket 层加上低优先级数据包的队列。就是说,从现在的单一队列改成两个。如果你需要启用第二队列,那么这将是一个低优先级的发送队列。socket 发送规则如下:
    如果 socket 可写,且两个队列都为空,立即发送。
    如果上一次有一个数据包没有发送完,无论它在哪个队列里,都确保先将其发送完。
    如果高优先级队列有数据包,先保证发送高优先级队列上的数据。
    如果高优先级队列为空,且低优先级队列不为空,发送一个低优先级队列上的数据包。
这可以用来解决前面提到的大数据包的问题。
在应用层,我们可以把大数据包分割成不大于 4K 的小数据包。给大数据包添加一个唯一的编号。这些分割后的小数据包进入低优先级队列,那么它们就会被切碎传输给对端。一个大数据包可能被切成数百份,其它的数据会穿插再其间。尤其不会影响心跳控制包的传输。
例如,客户端向服务器请求拍卖行目录,或是全球排行榜这种数据量很大的数据时,走这个大数据包通道,就不会影响正常的交互流程了。
        
http://blog.codingnow.com/2014/06/skynet_cluster.html skynet 的集群方案
老方案:
    在过去,skynet 的集群限制在 255 个节点,为每个服务的地址留出了 8bit 做节点号。消息传递根据节点号,通过节点间互联的 tcp 连接,被推送到那个 skynet 节点的 harbor 服务上,再进一步投递。
    这个方案可以隐藏两个 skynet 服务的位置,无论是在同一进程内还是分属不同机器上,都可以用唯一地址投递消息。但其实现比较简单,没有去考虑节点间的连接不稳定的情况。通常仅用于单台物理机承载能力不够,希望用多台硬件扩展处理能力的情况。这些机器也最好部署在同一台交换机下。
新方案:
    之前这个方案弹性不够。如果一台机器挂掉,使用相同的节点 id 重新接入 skynet 的后果的不可预知的。因为之前在线的服务很难知道一个节点下的旧地址全部失效,新启动的进程的内部状态已经不可能和之前相同。
    所以,我用更上层的 skynet api 重新实现了一套更具弹性的集群方案。
    和之前的方案不同,这次我不打算让集群间的通讯透明。如果你有一个消息是发放到集群内另一台机器中的某个服务的,需要用特别的集群消息投递 api 。节点本身用字符串名字,而不是 id 区格。集群间的消息用统一的序列化协议(为了简化协议)。
     这套新的方案,可以参考 examples 下的 config.c1 和 config.c2 分别启动两个节点相互通讯。
    如果使用这套方案,就可以不用老的多节点机制了(当然也可以混用)。为了简化配置,你可以将 skynet 配置为 harbor = 0 ,关闭老的多节点方案。这样,address standalone master 等配置项都不需要填写。
    取而代之的是,配置一个 cluster 项,指向一个 lua 文件,描述每个节点的名字和地址。

    新的 cluster 目前只支持一个 rpc call 方法 。用来调用远程服务。api 和 skynet.call 类似,但需要给出远程节点的字符串名字,且通讯协议必须用 lua 类型。
    这套新方案可以看成是对原有集群的一个补充。当你需要把多台机器部署到不同机房,节点间的关系比较弱,只是少部分具名服务间需要做 rpc 调用,那么新的方案可能更加合适一些。因为当远程节点断开联系后,发起 rpc 的一方会捕获到异常;且远程节点用名字索引,不受 255 个限制。断开连接后,也可以通过重连恢复服务。
        
http://blog.codingnow.com/2014/06/skynet_harbor_redesign.html 重新设计并实现了 skynet 的 harbor 模块
老设计:
    skynet 是可以启动多个节点,不同节点内的服务地址是相互唯一的。服务地址是一个 32bit 整数,同一进程内的地址的高 8bit 相同。这 8bit 区分了一个服务处于那个节点。
    每个节点中有一个特殊的服务叫做 harbor (港口) ,当一个消息的目的地址的高 8 位和本节点不同时,消息被投递到 harbor 服务中,它再通过 tcp 连接传输到目的节点的 harbor 服务中。
    不同的 skynet 节点的 harbor 间是如何建立起网络的呢?这依赖一个叫做 master 的服务。 这个 master 服务可以单独为一个进程,也可以附属在某一个 skynet 节点内部(默认配置)。
    master 会监听一个端口(在 config 里配置为 standalone 项),每个 skynet 节点都会根据 config 中的 master 项去连接 master 。master 再安排不同的 harbor 服务间相互建立连接。

    上面蓝色的是 master 服务,下面 5 个 harbor 服务间是互连的。master 又和所有的 harbor 相连。
    这就是早期的 skynet 分布式集群方案。有一篇 2 年前的 blog 记录了当时的想法,可以一窥历史。
    由于历史变迁,从早期的手脚架不全,到如今的 skynet 的基础设置日臻完善。这部分代码也改写过很多次,每每想做大的改动,都不敢过于激进。
    最近,我们的一个新项目要上线,由于运营方只能提供虚拟机,且网络状态不是很好,暴露了 skynet 在启动组网阶段的一些时序漏洞。所以这个周末我咬牙把这块东西全部重新设计实现了。
    当初为了简化设计,每两台机器间的连接使用了两条 TCP 连接。数据流在每条连接上都是单向的,即谁发起连接,谁就在这个连接上单向推送数据。这样做的好处是,如果双方都是可信的机器的话,可以省去握手的协议。
新设计:
     现在,节点间不再需要两条连接,而只用一条。每个节点加入网络(首先接入 master)后,由 master 通知它网络中已有几个节点,他会等待所有现存节点连接过来。所以连接建立后,就关闭监听端口。
    如果再有新节点加入网络,老节点主动去连接新节点。这样做的好处是,已经在工作的节点不需要打开端口等待。
    这套代码实现在 cmaster.lua 和 cslave.lua 中
,取代原来的 service_master.c 和 service_harbor.c ,用 lua 编写有更大的弹性。这两个服务还负责同步 skynet 网络中的全局可见的服务名字,原本在 C 版本中,这部分实现的很繁琐,改为 lua 后,清晰了许多。
    原有的远程消息代理模块被剥离出来,还是放在 service_harbor.c 中,比之前的代码篇幅小了很多。它只负责管理 tcp 连接的 fd, 而不必操心建立连接的过程。这样,也不再依赖额外的 gate 服务,只需要做简单的拼包处理即可。
    至此,C 版本的 gate 已经完全脱离了核心模块。在一般应用中,后来写的 lua 版 gate (http://blog.codingnow.com/2014/04/skynet_gate_lua_version.html)已经足够用了。
 类似资料:

相关阅读

相关文章

相关问答