当前位置: 首页 > 工具软件 > Skynet > 使用案例 >

skynet入门笔记

壤驷文华
2023-12-01

入门观看博客笔记:

博客笔记:

第五篇 关于服务别名

  • 别名注册与查询接口:

在skynet中,服务别名可以分为两种:

  • 一种是本地别名,本地别名只能在当前skynet节点使用,本地别名必须使用. 开头,例如:.testalias
  • 一种是全局别名,全局别名可以在所有skynet中使用,全局别名不能以. 开头, 例如:testalias

取别名:

1、给当前服务定一个别名 可以是全局别名 也可以是本地别名

skynet.register(aliasname)

2、给指定的服务定一个别名,可以是全局可以是本地

skynet.name(aliasname,servicehandler)

查询别名:

1、查询本地名为aliasname的服务 返回servicehandler 不存在就是返回nil

skynet.localname(aliasname)

2、查询别名为aliasname的服务,可以是全局别名也可以是本地的

a、当查询本地别名时,返回servicehandler,不存在就返回nil

b、当查询全局别名时,返回servicehandler,不存在就阻塞等待到该服务初始化完成

local skynet = require “skynet.harbor”

Harbor.queryname(aliasname)

注意:本地别名跟全局别名可以同时存在

skynet的全局别名服务是在cslave里面实现的,现在不允许二次修改全局别名绑定关系,所以全局别名一般用来给一个永远不会退出的服务来启用。

​但是有些情况下,我们确实需要二次修改全局别名绑定关系,那么这个时候,我们可以尝试去修改一下cslave.lua文件

全局唯一服名与这里的全局别名是两个概念的名词。

​ 全局唯一服名称: 是用来标识服务是唯一的,服务名称一般就是脚本名称,无法更改。

​ 全局别名: 是用来给服务起别名的,既可以给普通服起别名,也可以给全局唯一服起别名。

​ 他们两种名字是在不同的体系中的,有各种的起名字的方式,以及查询的方式。

​ 所以不要尝试用skynet.queryservice查询一个全局别名,也不要尝试使用harbor.queryname去查询一个全局唯一服。

第六篇 关于服务调度相关内容

  • 服务调度:  

1、skynet.sleep(time)

让当前的任务等待time * 0.01s

2、skynet.fork(func,…)

启动一个新的任务去执行函数func,其实就是开了一个协程 函数调用完成将返回线程句柄

虽然可以使用原生的coroutine.create来创建协程,但是会打乱skynet的工作流程

3、skynet.yield()

让出当前的任务执行流程,使本服务内其他任务有机会执行,随后会继续运行

4、skynet.wait()

让出当前任务执行流程 直到用wakeup唤醒

5、skynet.wakeup()

唤醒用wait或者sleep后处于等待状态的任务

6、skynet.timeout(time, func)

设定一个定时触发函数func,在time * 0.01秒后触发

7、skynet.starttime()

返回当前进程的启动 UTC 时间(秒)

8、skynet.now()

返回当前进程启动后经过的时间(0.01秒)

9、skynet.time()

通过starttime 和 now 计算出当前UTC 时间(秒)

注意:

每次使用skynet.fork其实都是从协程池中获取未被使用的协程,并把该协程加入到fork队列中,等待一个消息调度,然后会依次把fork队列中协程拿出来执行一遍,执行结束后,会把协程重新丢入协程池中,这样可以避免重复开启关闭协程的额外开销。

skynet.fork 创建的线程其实通过lua协程来实现的,即一个协程占用执行权后,其他的协程需要等待。

其实skynet.start服务启动函数实现中,就已经启动了一个timeout为0s的定时器,来执行通过skynet.start函数传参得到的初始化函数。其目的是为了让skynet工作线程调度一次新服务。这一次服务调度最重要的意义在于把fork队列中的协程全部执行一遍。

  • 错误处理:

使用assert会终止掉当前协程 不会再往下执行

如果不想中断,可以使用pcall来捕捉异常

第七章: 消息机制

​ skynet中的每一个服务都有一个独立的lua虚拟机,逻辑上服务之间是相互隔离的,那么你就不能使用传统意义上的LUA全局变量来进行服务间通信了。

​ 在skynet中服务之间可以通过skynet消息调度机制来完成通信。skynet中的服务是基于actor模型设计出来的,每个服务都可以接收消息,处理消息,发送应答消息。

​ 每条 skynet 消息由 6 部分构成:消息类型、session 、发起服务地址 、接收服务地址 、消息 C 指针、消息长度。

  • 消息类型:

最常用的是PTYPE_LUA,对应到lua层,叫做lua消息 ,大部分服务一般使用这种消息,默认情况下,PTYPE_REPSONSE、PTYPE_ERROR、PTYPE_LUA三种消息类型已经注册(查看源码了解情况),如果想使用其他的消息类型,需要自己显示注册消息 类型。

  • 注册消息处理函数:

当我们需要在一个服务中监听指定类型的消息,就需要在服务启动的时候先注册该类型的消息的监听,通常是在服务的入口函数 skynet.start 处通过调用 skynet.dispatch 来注册绑定

skynet.dispatch(type,function(session, source,…) … end)

--服务启动入口

skynet.start(function()

    --注册"lua"类型消息的回调函数

    skynet.dispatch("lua", function(session, address, ...)

        dosomething(...)

    end)

end)

可以注册新的消息类别,方法是使用 skynet.register_protocol。例如你可以注册一个以文本方式编码消息的消息类别。通常用 C 编写的服务更容易解析文本消息。skynet 已经定义了这种消息类别为 skynet.PTYPE_TEXT,但默认并没有注册到 lua 中使用。新的类别必须提供 pack 和 unpack 函数,用于消息的编码和解码。

  • 打包与解包消息:

 skynet中的消息在发送之前必须要把参数进行打包,然后才发送,接受方收到消息后会自动根据指定的解包函数进行解包,最常用的打包解包函数为skynet.pack与skynet.unpack.

skynet.pack(...)打包后,会返回两个参数,一个是C指针msg指向数据包的起始地址,sz一个是数据包的长度。msg指针的内存区域是动态申请的。

​ skynet.unpack(msg, sz)解包后,会返回一个参数列表。需要注意这个时候C指针msg指向的内存不会释放掉。如果msg有实际的用途,skynet框架会帮你在合适的地方释放掉,如果没有实际的用途,自己想释放掉可以使用skynet.trash(msg, sz)释放掉。

  • 发送消息的方法:

--用 type 类型向 addr 发送未打包的消息。该函数会自动把...参数列表进行打包,默认情况下lua消息使用skynet.pack打包。addr可以是服务句柄也可以是别名。

skynet.send(addr, type, ...)

--用 type 类型向 addr 发送一个打包好的消息。addr可以是服务句柄也可以是别名。

skynet.rawsend(addr, type, msg, sz) 

  • 发送必须响应的消息:

--用默认函数打包消息,向addr发送type类型的消息并等待返回响应,并对回应信息进行解包。(自动打包与解包。)

skynet.call(addr, type, ...) 

--直接向addr发送type类型的msg,sz并等待返回响应,不对回应信息解包。(需要自己打包与解包)

skynet.rawcall(addr, type, msg, sz) 

  • 响应消息的方法:

skynet.ret --目标服务消息处理后需要通过该函数将结果返回

skynet.retpack(...) --将消息用skynet.pack 打包,并调用 ret 回应。

  • session的意义:

session只有在使用skynet.call或者skynet.rawcall发送消息的时候才有意义。

​ 因为有可能一个服务开了多个协程去call消息,然后多个协程都在等待应答消息,回来了一个应答,那么到底是唤醒哪个协程,就可以通过session来判断了,skynet中的session能保证本服务中发出的消息是唯一的。消息与响应一一对应起来。

  • 使用skynet.response响应消息:

在使用skynet.ret或者skynet.retpack进行应答时,必须要保证与接受请求时在同一个协程中(源服务地址与协程句柄已经一一对应),也就是说在哪个协程接受的请求也必须在这个协程去做响应。

  • skynet.call失败的情况:

​ 当一个服务发起请求skynet.call 后等待应答,但是响应服务却退出了(调用skynet.exit) ,响应服务退出的时候,会自动给未答复的请求发送一个error 消息,告诉它可以从skynet.call阻塞返回了,请求的服务会直接报一个错误。

  • 服务重入问题:

​ 同一个 skynet 服务中的一条消息处理中,如果调用了一个阻塞 API ,那么它会被挂起。挂起过程中,这个服务可以响应其它消息。这很可能造成时序问题,要非常小心处理。

​ 换句话说,一旦你的消息处理过程有外部请求,那么先到的消息未必比后到的消息先处理完。且每个阻塞调用之后,服务的内部状态都未必和调用前的一致(因为别的消息处理过程可能改变状态)。

  • 服务临界区:

​ skynet.queue 模块可以帮助你回避这些服务重入或者伪并发引起的复杂性。

local queue = require "skynet.queue"

local cs = queue()  --获取一个执行队列

cs(f, ...) --将f丢到队列中执行

  • 注册其他信息:

register_protocol

  • 代理服务:

​ 在 skynet 中,有时候为一个服务实现一个前置的代理服务是很有必要的。所谓代理服务,就是向真正的功能服务发起请求时,请求消息发到另一个代理服务中,由这个代理服务转发这个请求给真正的功能服务。同样,回应消息也会被代理服务转发回去。

  • 节点间消息通信

第八章: 频道

local mc = require "skynet.multicast"

引入 multicast 模块后,你可以使用 skynet 的组播方案。你可以自由创建一个频道,并可以向其中投递任意消息。频道的订阅者可以收到投递的消息。

local channel = mc.new()  -- 创建一个频道,成功创建后,channel.channel 是这个频道的 id 。

local channel2 = mc.new {

  channel = channel.channel,  -- 绑定上一个频道

  dispatch = function (channel, source, ...) end,  -- 设置这个频道的消息处理函数

}

​ 如上面的例子,new 函数可以接收一个参数表。channel 是频道 id ,dispatch 是订阅消息的回调函数。如果你不给出 channel id ,则新创建出一个频道来。

​ 通常,由一个服务创建出频道,再将 .channel 这个 id 通知别的地方。获得这个 id 的位置,都可以绑定这个频道。

​ channel:publish(...) 可以向一个频道发布消息。消息可以是任意数量合法的 lua 值。

​ 绑定到一个频道后,默认并不接收这个频道上的消息(也许你只想向这个频道发布消息)。你需要先调用 channel:subscribe() 订阅它。

​ 如果不再想收到该频道的消息,调用 channel:unsubscribe 。

注意:没有引用的 channel 对象,gc 会收回掉,并自动 unsubscribe 。所以,如果需要 channel 的 dispatch 函数持续工作,你需要保持 channel 对象的引用。

​ 当一个频道不再使用,你可以调用 channel:delete() 让系统回收它。注:多次调用 channel:delete 是无害的,因为 channel id 不会重复使用。在频道被销毁后再调用 subscribe 或 publish 等也不会引起异常,但订阅是不起作用的,消息也不再广播。

第九章: socket通信

常用API:

local socket = require "skynet.socket"

--建立一个 TCP 连接。返回一个数字 id 。

socket.open(address, port)      

--关闭一个连接,这个 API 有可能阻塞住执行流。因为如果有其它 coroutine 

--正在阻塞读这个 id 对应的连接,会先驱使读操作结束,close 操作才返回。

socket.close(id)

--在极其罕见的情况下,需要粗暴的直接关闭某个连接,而避免 socket.close 的阻塞等待流程,可以使用它。

socket.close_fd(id)

--强行关闭一个连接。和 close 不同的是,它不会等待可能存在的其它 coroutine 的读操作。

--一般不建议使用这个 API ,但如果你需要在 __gc 元方法中关闭连接的话,

--shutdown 是一个比 close 更好的选择(因为在 gc 过程中无法切换 coroutine)。与close_fd类似

socket.shutdown(id)

--[[

    从一个 socket 上读 sz 指定的字节数。

    如果读到了指定长度的字符串,它把这个字符串返回。

    如果连接断开导致字节数不够,将返回一个 false 加上读到的字符串。

    如果 sz 为 nil ,则返回尽可能多的字节数,但至少读一个字节(若无新数据,会阻塞)。

--]]

socket.read(id, sz)

--从一个 socket 上读所有的数据,直到 socket 主动断开,或在其它 coroutine 用 socket.close 关闭它。

socket.readall(id)

--从一个 socket 上读一行数据。sep 指行分割符。默认的 sep 为 "\n"。读到的字符串是不包含这个分割符的。

--如果另外一端就关闭了,那么这个时候会返回一个nil,如果buffer中有未读数据则作为第二个返回值返回。

socket.readline(id, sep) 

--等待一个 socket 可读。

socket.block(id)

--把一个字符串置入正常的写队列,skynet 框架会在 socket 可写时发送它。

socket.write(id, str) 

--把字符串写入低优先级队列。如果正常的写队列还有写操作未完成时,低优先级队列上的数据永远不会被发出。

--只有在正常写队列为空时,才会处理低优先级队列。但是,每次写的字符串都可以看成原子操作。

--不会只发送一半,然后转去发送正常写队列的数据。

socket.lwrite(id, str) 

/Users/admin/skynet/my_workspace/mongotest/mongodb.lua

/Users/admin/skynet/test/testmongodb.lua

-监听一个端口,返回一个 id ,供 start 使用。

socket.listen(address, port) 

--[[

    accept 是一个函数。每当一个监听的 id 对应的 socket 上有连接接入的时候,都会调用 accept 函数。

这个函数会得到接入连接的 id 以及 ip 地址。你可以做后续操作。

    每当 accept 函数获得一个新的 socket id 后,并不会立即收到这个 socket 上的数据。

这是因为,我们有时会希望把这个 socket 的操作权转让给别的服务去处理。accept(id, addr)

]]--

socket.start(id , accept) 

--[[

    任何一个服务只有在调用 socket.start(id) 之后,才可以读到这个 socket 上的数据。

向一个 socket id 写数据也需要先调用 start 。

    socket 的 id 对于整个 skynet 节点都是公开的。也就是说,你可以把 id 这个数字

通过消息发送给其它服务,其他服务也可以去操作它。skynet 框架是根据调用 start 这个 

api 的位置来决定把对应 socket 上的数据转发到哪里去的。

--]]

socket.start(id)

--清除 socket id 在本服务内的数据结构,但并不关闭这个 socket 。

--这可以用于你把 id 发送给其它服务,以转交 socket 的控制权。

socket.abandon(id) 

--[[

    当 id 对应的 socket 上待发的数据超过 1M 字节后,系统将回调 callback 以示警告。

function callback(id, size) 回调函数接收两个参数 id 和 size ,size 的单位是 K 。

    如果你不设回调,那么将每增加 64K 利用 skynet.error 写一行错误信息。

--]]

socket.warning(id, callback) 

第十二章: Snax服务入门

  • Snax服务基础API

1、启动snax服务的API:

local snax = require "snax"

snax.newservice(name, ...) --可以把一个服务启动多份。传入服务名和参数,它会返回一个对象,用于和这个启动的服务交互。如果多次调用 newservice ,即使名字相同,也会生成多份服务的实例,它们各自独立,由不同的对象区分。注意返回的不是服务地址,是一个对象。

snax.uniqueservice(name, ...) --和上面 api 类似,但在一个节点上只会启动一份同名服务。如果你多次调用它,会返回相同的对象。

snax.globalservice(name, ...) --和上面的 api 类似,但在整个 skynet 网络中(如果你启动了多个节点),只会有一个同名服务。

2、查询snax服务:

snax.queryservice(name) --查询当前节点的具名服务,返回一个服务对象。如果服务尚未启动,那么一直阻塞等待它启动完毕。

snax.queryglobal(name) --查询一个全局名字的服务,返回一个服务对象。如果服务尚未启动,那么一直阻塞等待它启动完毕。

snax.self() --用来获取自己这个服务对象,与skynet.self不同,它不是地址。

3、snax服务退出:

snax.kill(obj, ...) --如果你想让一个 snax 服务退出,调用 

snax.exit(...) --退出当前服务,它等价于 snax.kill(snax.self(), ...) 。

4、通过snax服务地址获取snax服务对象

对于匿名服务,你无法在别处通过名字得到和它交互的对象。如果你有这个需求,可以把对象的handle通过消息发送给别人。 handle 是一个数字,即 snax 服务的 skynet 服务地址。

--把handle转换成服务对象。这里第二个参数需要传入服务的启动名,以用来了解这个服务有哪些远程方法可以供调用。当然,你也可以直接把 .type 域和 .handle 一起发送过去,而不必在源代码上约定。

snax.bind(handle, typename) 

snax启动查找服务路径是config.path的snax变量来指定

snax = root.."examples/?.lua;"..root.."test/?.lua;".."my_workspace/?.lua" --添加my_workspace路径

  • 最简单的snax服务:

每个 snax 服务中都需要定义一个 init 函数,启动这个服务会调用这个函数,并把启动参数传给它。

snax 服务还可以定义一个 exit 函数用于响应服务退出的事件,同样能接收一些参数。

​ 和标准的 skynet 服务不同,这些参数的类型不受限制,可以是 lua 的复杂数据类型。(而 skynet 服务受底层限制,只可以接受字符串参数)

  • Snax服务请求:

 snax请求分为无响应请求与有响应请求。

​ 对snax服务发请求的方法

--无响应请求,obj是snax对象,post表示无响应请求,CMD具体的请求命令,...为请求参数列表,发送完马上返回

obj.post.CMD(...)

--有响应请求,obj是snax对象,req表示有响应请求,CMD具体的请求命令,...为请求参数列表,发送完等待响应

obj.req.CMD(...)

第十四章: 加密了解

  • 加密算法:

package.cpath = "luaclib/?.so"

local crypt = require "client.crypt"

--如果在skynet中使用直接 local crypt = require "skynet.crypt"

--dhexchange转换8字节的key

crypt.dhexchange(key)

--通过key1与key2得到密钥

crypt.dhsecret(key1, key2)

第十六章: Mysql 

  • 连接mysql:

local skynet = require "skynet"

local mysql = require "skynet.db.mysql" --引入模块

--连接成功db返回非nil

local db=mysql.connect({

        host="x.x.x.x",   

        port=x,

        database="skynet", 

        user="root",

        password="123456",

        max_packet_size = 1024 * 1024, --数据包最大字节数

        on_connect = on_connect         --连接成功的回调函数

    })

--关闭连接

db:disconnect()

  • 执行mysql语句:

执行SQL语句可以使用db:query(sql),参数sql可以填任何你想要执行的SQL语句。

 db:query的调度情况

​ db:query通过给mysql服务端发送sql语句,并且阻塞等待mysql服务端返回结果,这个过程当中,db.query

会自动让出当前协程的执行权,等待skynet的下次调度。

第十八章:

http协议的服务分为http服务端与http客户端,skynet服务作为http服务端的时候,可以像其他的web服务器一样,接收http请求并给与应答。skynet的服务作为http客户端的时候,可以通过http协议像远端发送请求并等待得到应答。

  • http服务端:

skynet.httpd 是一个独立于 skynet 的,用于 http 协议解析的库,它本身依赖 socket api 的注入。使用它,你需要把读写 socket 的 API 封装好,注入到里面就可以工作。

​ skynet.sockethelper 模块将 skynet 的 Socket API 封装成 skynet.httpd 可以接受的形式:阻塞读写指定的字节数、网络错误以异常形式抛出。

cluster集群的api:

1、cluster.reload(cfg) : 让本节点(重新)加载节点配置,参数cfg是一个lua表,指示集群中各节点的地址

例如

cluster.reload({

node1 = "127.0.0.1:xxxx"

node2 = “127.0.0.1:xxxx”

})

指明集群中有名为”node1“和”node2“的两个节点,node1监听本地7001端口,node2监听本地7002端口。

2、cluster.open(node):启动节点。节点1需要调用cluster.open(“node1”)、节点2需要调用cluster.open(“node2”),这样它们才能知道自己是cluster.reload中的哪一项,并开启对应的端口监听。

3、cluster.send(node, address, cmd, …):向名为node节点、地址为address的服务推送一条消息,这里参数cmd代表消息名

4、cluster.call(node, address, cmd, …):它与cluster.send的功能相似,都是向另一个服务推送消息。不同的是,它是个阻塞方法,会等待对方的回应。通过cluster发送的消息均为”lua“类型,无需指定

5、cluster.proxy(node, address):为远程节点上的服务创建一个本地代理服务,它会返回代理对象,之后可以用skynet.send、skynet.call操作该代理

github入门教学:

云风博客:macosx

Bootsrtap:

Skynet由一个或者多个进程构成,每个进程称为一个skynet节点。

1.第一个启动的服务是logger。负责之后服务中的log输出。logger是一个简单的c服务,skynet_error会把字符串发送给它,在config文件中可以为logger配置项配置输出文件名,表示输出到标准输出。

2.bootstrap关系这skynet运行的第二个服务。通常通过这个服务把整个系统启动起来。默认的配置项是snlua bootstrap 意味着启动snlua服务 并传递bootstrap作为参数 snlua是lua沙盒服务 bootstrap会根据配置的luaservice匹配到最终的lua脚本上。如果按照默认的配置这个脚本是service/bootstrap.lua。

这段脚本通常会根据standalone配置判断你启动的是一个master节点还是slave节点。如果是master节点还会进一步通过harbor是否配置为0来判断你是否启动的是一个单节点的skynet网络。

Config:

Skynet启动的时候会读取里面必要的配置项,并将暂时用不到的配置项以字符串形式保存在skynet内部的env表中。这些配置项可以通过skynet.getenv获取。

接下来展示必要配置项:

Thread:启动多少个线程。通常不超过实际拥有的cpu核心数。

bootstrap:skynet启动的第一个服务以及启动参数。通常指的是 service/bootstrap.lua 这段代码。

Cpath:用c编写的服务模块的位置。通常指cservice下的那些.so文件。如果你的系统的动态库不是以 .so 为后缀,需要做相应的修改。这个路径可以配置多项,以 ; 分割。

在bootstrap下的一些配置项:

logger 它决定了 skynet 内建的 skynet_error 这个 C API 将信息输出到什么文件中。如果 logger 配置为 nil ,将输出到标准输出。你可以配置一个文件名来将信息记录在特定文件中。

logservice 默认为 "logger" ,你可以配置为你定制的 log 服务(比如加上时间戳等更多信息)。可以参考 service_logger.c 来实现它。注:如果你希望用 lua 来编写这个服务,可以在这里填写 snlua ,然后在 logger 配置具体的 lua 服务的名字。在 examples 目录下,有 config.userlog 这个范例可供参考。

logpath 配置一个路径,当你运行时为一个服务打开 log 时,这个服务所有的输入消息都会被记录在这个目录下,文件名为服务地址。

standalone 如果把这个 skynet 进程作为主进程启动(skynet 可以由分布在多台机器上的多个进程构成网络),那么需要配置standalone 这一项,表示这个进程是主节点,它需要开启一个控制中心,监听一个端口,让其它节点接入。

address 当前 skynet 节点的地址和端口,方便其它节点和它组网。注:即使你只使用一个节点,也需要开启控制中心,并额外配置这个节点的地址和端口。

master 指定 skynet 控制中心的地址和端口,如果你配置了 standalone 项,那么这一项通常和 standalone 相同。

harbor 可以是 1-255 间的任意整数。一个 skynet 网络最多支持 255 个节点。每个节点有必须有一个唯一的编号。

start 这是 bootstrap 最后一个环节将启动的 lua 服务,也就是你定制的 skynet 节点的主程序。默认为 main ,即启动 main.lua 这个脚本。这个 lua 服务的路径由下面的 luaservice 指定。

enablessl 默认为空。如果需要通过 ltls 模块支持 https ,那么需要设置为 true 。

DataCenter

datacenter 可用来在整个 skynet 网络做跨节点的数据共享。

datacenter 类似一个全网络共享的注册表。它是一个树结构,任何人都可以向其中写入一些合法的 lua 数据,其它服务可以从中取出来。所以你可以把一些需要跨节点访问的服务,自己把其地址记在 datacenter 中,需要的人可以读出。

local datacenter = require “skynet.datacenter”

三种方法:

Datacenter.set(key1,key2,…,value):可以向key1,key2设置一个值value 这个api至少需要两个参数,没有特别限制树结构的层级数

Datacenter.get(key1,kye2,…):从key1和key2读一个值,这个api至少需要一个参数,如果传入过多的参数,则用来读出树的一个分支

Datacenter.wait(key1,key2,…):同get方法。但如果读取的分支为nil的时候,这个函数就会阻塞,直到有人更新这个分支才返回。当读写次序不确定的时候,但你需要读到其它地方写入的数据后再做后续事情时,用它比循环尝试读取要好的多。wait 必须作用于一个叶节点,不能等待一个分支。

注意:这三个 api 都会阻塞住当前 coroutine ,留心异步重入的问题。

一个 skynet 服务在某个业务流程被挂起后,即使回应消息尚未收到,它还是可以处理其他的消息的。所以同一个 skynet 服务可以同时拥有多条业务执行线。这些处理流程永远不会真正的并行,它们只是在轮流工作。一段业务会一直运行到下一个io阻塞点,然后切换到下一段逻辑。

两次阻塞的api调用之间,运行过程是原子的。

在同一服务内还可以有多个用户线程,这些线程可以用skynet.fork传入一个函数启动,也可以利用sky net的定时器的回调函数启动。

同一服务内的不同用户线程永远是轮流获得执行权的,每个线程都会需要一个阻塞操作而挂起让出控制权。也会在其他线程让出控制权后再延续运行。

需要注意的是如果一条用户线程永远不屌用阻塞api让出控制权,那么它将永远占据系统工作线程。需要开发者自己小心不要陷入死循坏。

每条skynet消息由6部分组成:消息类型、session、发起服务地址、消息c指针、消息长度

skynet 预定义了一组消息类型,需要开发者关心的有:回应消息、网络消息、调试消息、文本消息、Lua 消息、错误。

回应消息通常不需要特别处理,它有skynet基础库管理,用来调度服务内的coroutine。

网络消息也不必直接处理它,skynet提供了socket封装库,封装管理这类消息,改由一组更友好的socket api方便使用。

调试消息已经被默认的 skynet 基础库处理了。它使得所有 skynet 服务都提供有一些共同的能力。skynet 并不是通过外框架直接控制每个 lua 虚拟机,调试控制台只是通过向对应的服务发送调试消息,服务自身配合运行才得以反馈自身的状态。

真正的业务逻辑是由文本类消息和 Lua 类消息驱动的。它们的区别仅在于消息的编码形式不同,文本类消息主要方便一些底层的,直接使用 C 编写的服务处理,它就是简单字节串;而 Lua 类消息则可以序列化 Lua 的复杂数据类型。大多数情况下,我们都只使用 lua 类消息。

接管某类消息需要在服务的初始化过程中注册该消息的序列化及反序列化函数,以及消息回调函数。

protobuf协议笔记:

一、创建.proto文件,定义数据结构

定义一个名为test的信息 message关键字后跟上消息的名称

message xxx{}

定义了message具有的字段 形式有:

required : 字段只能也必须出现1 次

optional:字段可出现0次或者多次

repeated:字段可出现任意多次

类型有:int32 int64 sint32 sint64 string 32-bit …

字段编号: 0~536870911 (除去19000和19999之间的数字)

字段规则 类型 名称 = 字段编号

可以内嵌message类型 用内嵌的message来生成字段

required 修饰的字段如果没有指定值,将采用默认值填充;

​optional修饰的字段如果没有指定值,直接为空;

使用protobuffer文件:

Local protobuf = require “protobuf”

Protobuf.register_file(protofile) —注册protobuffer文件

protobuf.encode("package.message", { ... }) —根据注册的protofile中的类定义进行序列化,返回得到一个stringbuffer

protobuf.decode("package.message", stringbuffer) —根据注册的protofile中的类定义进行反序列化

Snax服务深入了解:

Init函数:snax服务中都需要定义的函数,启动这个服务的时候就会调用这个函数,并传入启动参数

Exit函数:用于相应服务退出的事件,同样也能接收一些参数

启动snax服务的三种方式:

local snax = require “snax”

一、

Snax.newservice(name,…):可以把一个服务启动多份。传入服务名和参数,它会返回一个对象,用于和这个启动的服务交互。如果多次调用 newservice ,即使名字相同,也会生成多份服务的实例,它们各自独立,由不同的对象区分。

二、

Snax.uniqueservice(name,…):和上面 api 类似,但在一个节点上只会启动一份同名服务。如果你多次调用它,会返回相同的对象。

三、

Snax.globalservice(name,…):和上面的 api 类似,但在整个 skynet 网络中(如果你启动了多个节点),只会有一个同名服务。

总结起来就是前一种方式可以看成是启动了一个匿名服务,启动后只能用地址(以及对服务地址的对象封装)与之通讯;后两种方式都可以看成是具名服务,之后可以用名字找到它。

常用api:

Snax.queryservice(name):查询当前节点的具名服务,返回一个服务对象。如果服务尚未启动,那么一直阻塞等待它启动完毕。

Snax.queryglobal(name):查询一个全局名字的服务,返回一个服务对象。如果服务尚未启动,那么一直阻塞等待它启动完毕。

对于匿名服务,你无法在别处通过名字得到和它交互的对象。如果你有这个需求,可以把对象的 .handle 域通过消息发送给别人。即 snax 服务的 skynet 服务地址。(handle是一个数字)

Handle的接收方可以通过snax.bind(handle,typename)将它转换成服务对象。

snax.kill(obj,…):用于snax服务退出 snax.exit(…)等价于snax.kill(snax.self(),….)

Snax.self():用于获取自己这个服务对象,等价于snax.bind(skynet.self(),SERVICE_NAME)

Snax的远程方法:

snax 服务中可以定义一组函数用于响应其它服务提出的请求,并给出(或不给出)回应。一个非 snax 服务也可以调用 snax 服务的远程方法。

一、

Function response.foobar(…) foobar是方法名,response表示这个方法一定有一个回应。通过函数的返回值来回应远程调用

调用这个远程方法,可以通过obj.req.foobar(…):来调用它。obj是服务对象,req表示这个调用需要接收返回值。foobar是方法的名字

让使用者明确 req 类型是为了让调用者(以及潜在的代码维护者)清楚这次调用是阻塞的,会导致服务挂起,等待对方的回应。

二、

不需要返回值使用function accept.foobar(…)来声明 这里可以有和 response 组相同的方法名。通过 accept 前缀来区分 response 组下同名的方法。这类方法的实现函数不可以有返回值。

调用这类远程方法,使用obj.post.foobar(…) 这个调用不会阻塞。 这里的服务不会挂起

补充:注: 除了用于本地消息或 master/slave 结构下的消息,post 现在也适用于 Cluster 模式, 。

热更新:

snax 是支持热更新的(只能热更新 snax 框架编写的 lua 服务)。但热更新更多的用途是做不停机的 bug 修复,不应用于常规的版本更新。

可以通过snax.hotfix(obj,patchcode)来向obj提交一个patch

在patch中可以包含一个function hotfix(…)函数。在patch提交后立即执行。这个函数可以用来查看或修改 snax 服务的线上状态(因为 local 变量会被关联)。hotfix 的函数返回值会传递给 snax.hotfix 的调用者。

注:不可以在 patch 中增加新的远程方法。

skynet入门常用api总结:

skynet 使用 socket api:

local socket = require “skynet.socket”

Socket.listen(host,port,backlog):监听客户端的连接

Socket.open(address,potr):建立一个tcp连接。返回一个数字id,建立连接时会阻塞当前协程,直到连接建立过程完成

Socket.close(id):关闭一个连接。这个API可能会阻塞住执行流,因为如果有其他协程

Socket.close_fd(id):极其罕见的情况下需要直接关闭某个连接。避免socket.close()的阻塞等待流程。

Socket.shutdown(id):强行关闭一个连接 与close不同的地方在于它不会等待可能存在的其他coroutine的读操作,一般不建议使用这个api,但如果在gc元方法中关闭连接的话,shutdown是一个比close更好的选择(因为gc的过程中无法切换coroutine)。

Socket.read(id,sz):从一个socket上读sz指定的字节数,如果读到了指定长度的字符串,它把这个字符串返回,如果链接断开导致字符串字节数不够,将返回一个false加上读到的字符串。如果sz为nil,则返回尽可能多的字节数,但至少读一个字节(若无新数据,会阻塞)

 socket。readline(id):读取一行的数据,seq指行分割符。默认为”\n” 读到的字符串是不包含这个分隔符的

socket.readall(id):读取所有的数据 直到socket主动断开 或者其他coroutine用socket.close关闭它

socket.block(id):等待一个socket可读

Socket API 中有两个不同的写操作 对应skynet为每个socket设定的两个写队列 通常为们只需要用:

socket.write(id,str):把一个字符串置入正常的写队列 skynet框架会在socket可写时发送它

低优先级的写操作:

socket.lwrite(id,str):把字符串写入低优先级队列。如果正常的写队列还有写操作未完成时,低优先级队列上的数据永远不会被发出。只有在正常写队列为空时,才会处理低优先级队列。但是,每次写的字符串都可以看成原子操作。不会只发送一半,然后转去发送正常写队列的数据。

对于服务器,通常我们需要监听一个端口,并转发某个接入连接的处理权。那么可以用如下 API :

socket.listen(address,port):监听一个接口 返回一个ID 供start使用

Socket.start(id,accept):accept是一个函数。每当一个监听的id对应的socket上有链接接入的时候,都会调用accept。这个函数会得到接入链接的id以及ip地址,你可以做后续操作

每当 accept 函数获得一个新的 socket id 后,并不会立即收到这个 socket 上的数据。这是因为,我们有时会希望把这个 socket 的操作权转让给别的服务去处理。

socket 的 id 对于整个 skynet 节点都是公开的。也就是说,你可以把 id 这个数字通过消息发送给其它服务,其他服务也可以去操作它。任何一个服务只有在调用 socket.start(id) 之后,才可以收到这个 socket 上的数据。skynet 框架是根据调用 start 这个 api 的位置来决定把对应 socket 上的数据转发到哪里去的。

向一个 socket id 写数据也需要先调用 start ,但写数据不限制在调用 start 的同一个服务中。也就是说,你可以在一个服务中调用 start ,然后在另一个服务中向其写入数据。skynet 可以保证一次 write 调用的原子性。即,如果你有多个服务同时向一个 socket id 写数据,每个写操作的串不会被分割开。

Socket.abandon(id):清除socket id 在本服务内的数据结构 但并不关闭这个socket。 可以用于你把id发送给其他服务,以转交socket的控制权

socket.warning(id,callback): 当id对应的socket上待发的数据超过1M 字节后,系统将回调callback以示警告

Function callback(id,size) :回调函数接收两个参数 id 和 size ,size 的单位是 K 。如果你不设回调,那么将每增加 64K 利用 skynet.error 写一行错误信息。一旦产生过至少一次超出警告,那么在待发缓冲区清空时,还会再产生一个 size 为 0 的消息,示意缓冲区已空。

域名查询:

在 skynet 的底层,当使用域名而不是 ip 时,由于调用了系统 api getaddrinfo ,有可能阻塞住整个 socket 线程(不仅仅是阻塞当前服务,而是阻塞整个 skynet 节点的网络消息处理)。虽然大多数情况下,我们并不需要向外主动建立连接。但如果你使用了类似 httpc 这样的模块以域名形式向外请求时,一定要关注这个问题。

skynet 暂时不打算在底层实现非阻塞的域名查询。但提供了一个上层模块来辅助你解决 dns 查询时造成的线程阻塞问题。

Local dns = require “skynet.dns”

在使用前,必须设置 dns 服务器。

dns.server(ip,port):port默认值为53 如果不填写ip 的话 将从/etc/resolv.conf 中找到合适的ip

Dns.resolve(name,ipv6):查询name对应的ip,如果ipv6为true则查询ipv6地址,默认为false 如果查询失败就抛出异常 成功就返回ip 以及一张包含有所有ip的table

Dbs.flush():默认情况下 模块会根据TTL的值cache查询结果。在查询超时的情况下,也可能返回之前的结果

Dbs.flush():可以用来清空cache 注意:cache保存在调用者的服务中 并非针对整个skynet进场 所以推荐写一个独立的dns查询服务统一处理dns查询

lua琐碎:

1、每一个 snlua 服务都绑定了一个Lua VM。 Lua VM实现是线程安全的。

2、Lua查找一个表元素时的规则,其实就是如下3个步骤:

A.在表中查找,如果找到,返回该元素,找不到则继续

B.判断该表是否有元表(操作指南),如果没有元表,返回nil,有元表则继续

C.判断元表(操作指南)中有没有关于索引失败的指南(即__index方法),如果没有(即__index方法为nil),则返回nil;如果__index方法是一个表,则重复1、2、3;如果__index方法是一个函数,则返回该函数的返回值

4、首先在lua中使用“:”定义的函数会自动传入一个名为self的变量,这个变量是隐含的,self同c++中的this一样,表示当前对象的指针:而“.”定义的函数中没有self。

用冒号(:)调用函数时,会默认传一个值(调用者自身)作为第一个参数;

用点(.)调用函数时,则没有。

function Animal.Sleep(time) end

这种写法是一种语法糖 原型是 Animal.Sleep = function(time) end

用:时也是一种语法糖 默认传递一个self参数

function Animal:Eat (food) end

等价于

Function Animal.Eat (self , food ) end

1、package.path用于搜索自己写的库文件或者第三方的库文件。搜索指定路径下以.lua结尾的文件

2、package.cpath用于搜索自己写的so库文件或者第三方的so库文件。搜素指定路径下以.so结尾的文件

3、元表:

__index方法:当调用原表中不存在的索引的时候,会调用__index对应的元素跟方法

__newindex方法:当往元表中添加新的键值对的时候,会调用对应的方法

__add方法:当两个表相加的时候会调用其对应的方法

__call方法:当把表当作方法来调用的时候,会执行call对应的方法

__tostring方法:当把表当字符串输出的时候会调用这个方法,必须要有返回值

__gc方法:这个方法是在 table 被回收时会触发的回调,可以用来做一些 lua内存泄露 及 资源释放 等操作

mongod数据库入门:

  • 后台启动mongodb:

mongod --dbpath /usr/local/var/mongodb --logpath /usr/local/var/log/mongodb/mongo.log --fork

  • --dbpath 设置数据存放目录
  • --logpath 设置日志存放目录
  • --fork 在后台运行

查看mongod服务是否启动:

ps aux | grep -v grep | grep mongod

  • 连接数据库:

mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]

  • mongodb:// 这是固定的格式,必须要指定。
  • username:password@ 可选项,如果设置,在连接数据库服务器之后,驱动都会尝试登录这个数据库
  • host1 必须的指定至少一个host, host1 是这个URI唯一要填写的。它指定了要连接服务器的地址。如果要连接复制集,请指定多个主机地址。
  • portX 可选的指定端口,如果不填,默认为27017
  • /database 如果指定username:password@,连接并验证登录指定数据库。若不指定,默认打开 test 数据库。
  • ?options 是连接选项。如果不使用/database,则前面需要加上/。所有连接选项都是键值对name=value,键值对之间通过&或;(分号)隔开

使用用户名和密码连接到 MongoDB 服务器,你必须使用 'username:password@hostname/dbname' 格式,'username'为用户名,'password' 为密码。

  • 创建数据库:

use DATABASE_NAME  如果数据库不存在,则创建数据库,否则切换到指定数据库。

  • 删除数据库:

db.dropDatabase() 删除当前数据库,默认为 test,你可以使用 db 命令查看当前数据库名。

  • MongoDB 创建集合

db.createCollection(name, options)

参数说明:

  • name: 要创建的集合名称
  • options: 可选参数, 指定有关内存大小及索引的选项

options 可以是如下参数:

字段

类型

描述

capped

布尔

(可选)如果为 true,则创建固定集合。固定集合是指有着固定大小的集合,当达到最大值时,它会自动覆盖最早的文档。

当该值为 true 时,必须指定 size 参数。

autoIndexId

布尔

3.2 之后不再支持该参数。(可选)如为 true,自动在 _id 字段创建索引。默认为 false。

size

数值

(可选)为固定集合指定一个最大值,即字节数。

如果 capped  true,也需要指定该字段。

max

数值

(可选)指定固定集合中包含文档的最大数量。

在插入文档时,MongoDB 首先检查固定集合的 size 字段,然后检查 max 字段。

  • 删除集合:

db.collection.drop()

  • 插入文档:

MongoDB 使用 insert() 或 save() 方法向集合中插入文档,语法如下:

db.COLLECTION_NAME.insert(document)

db.COLLECTION_NAME.save(document)

  • save():如果 _id 主键存在则更新数据,如果不存在就插入数据。该方法新版本中已废弃,可以使用 db.collection.insertOne() 或 db.collection.replaceOne() 来代替。
  • insert(): 若插入的数据主键已经存在,则会抛 org.springframework.dao.DuplicateKeyException 异常,提示主键重复,不保存当前数据。

db.collection.insertOne() 用于向集合插入一个新文档,语法格式如下:

db.collection.insertOne(

   <document>,

   {

      writeConcern: <document>

   }

)

db.collection.insertMany() 用于向集合插入一个多个文档,语法格式如下:

db.collection.insertMany(

   [ <document 1> , <document 2>, ... ],

   {

      writeConcern: <document>,

      ordered: <boolean>

   }

)

参数说明:

  • document:要写入的文档。
  • writeConcern:写入策略,默认为 1,即要求确认写操作,0 是不要求。
  • ordered:指定是否按顺序写入,默认 true,按顺序写入。

  • 更新文档:

MongoDB 使用 update() 和 save() 方法来更新集合中的文档。

update() 方法

update() 方法用于更新已存在的文档。语法格式如下:

db.collection.update(

   <query>,

   <update>,

   {

     upsert: <boolean>,

     multi: <boolean>,

     writeConcern: <document>

   }

)

参数说明:

  • query : update的查询条件,类似sql update查询内where后面的。
  • update : update的对象和一些更新的操作符(如$,$inc...)等,也可以理解为sql update查询内set后面的
  • upsert : 可选,这个参数的意思是,如果不存在update的记录,是否插入objNew,true为插入,默认是false,不插入。
  • multi : 可选,mongodb 默认是false,只更新找到的第一条记录,如果这个参数为true,就把按条件查出来多条记录全部更新。
  • writeConcern :可选,抛出异常的级别。

save() 方法

save() 方法通过传入的文档来替换已有文档,_id 主键存在就更新,不存在就插入。语法格式如下:

db.collection.save(

   <document>,

   {

     writeConcern: <document>

   }

)

参数说明:

  • document : 文档数据。
  • writeConcern :可选,抛出异常的级别。

  • 删除文档:

remove() 方法的基本语法格式如下所示:

db.collection.remove(

   <query>,

   <justOne>

)

如果你的 MongoDB 是 2.6 版本以后的,语法格式如下:

db.collection.remove(

   <query>,

   {

     justOne: <boolean>,

     writeConcern: <document>

   }

)

参数说明:

  • query :(可选)删除的文档的条件。
  • justOne : (可选)如果设为 true 或 1,则只删除一个文档,如果不设置该参数,或使用默认值 false,则删除所有匹配条件的文档。
  • writeConcern :(可选)抛出异常的级别。

  • 查询文档:

语法

MongoDB 查询数据的语法格式如下:

db.collection.find(query, projection)

  • query :可选,使用查询操作符指定查询条件
  • projection :可选,使用投影操作符指定返回的键。查询时返回文档中所有键值, 只需省略该参数即可(默认省略)。

如果你需要以易读的方式来读取数据,可以使用 pretty() 方法,语法格式如下:

>db.col.find().pretty()

pretty() 方法以格式化的方式来显示所有文档

Skynet queue:

有时候为了保持处理客户端发送信息的顺序性 就需要用到skynet.queue

语法:

local queue = require “skynet.queue”

Local xx  = queue() —获取一个执行队列 返回值为一个 closure

xx(f,…) —将函数f以及参数丢进队列之中

源码为闭包函数

 类似资料: