skynet是基于C做任务调度和消息传递,
基于lua做业务(消息)处理的框架。
只要使用的平台c编译器支持c99,就能够编译运行skynet。
所以理论上skynet可以运行在嵌入式领域的。
虽然skynet非常好用,但是skynet却不太好入门。
如果不充分理解skynet的原理,那么会提高不少的维护成本。
因此编写此文档旨在更了解skynet的原理。
关于skynet框架的C语言部分网上已经很多了,不多赘述。
对我个人帮助最大的是该篇:http://t.zoukankan.com/still-smile-p-15675806.html
本文主要介绍skynet中服务部分的功能介绍和实现原理。
本文将按服务的启动顺序依次介绍。
其中snlua服务是专用于创建lua服务的,
是lua服务的核心和基础,也是skynet框架的核心所在,
因此会着重讲解。
希望阅读者能够耐心阅读lua服务。
讲解服务前说明下我将按照一下几个方向去讲解:
创建snlua服务实例时,会调用snlua_create()函数创建snlua模块的实例。
snlua_create()中还会创建一个lua状态机并保存snlua的实例中。
struct snlua *
snlua_create(void) {
struct snlua * l = skynet_malloc(sizeof(*l));
memset(l,0,sizeof(*l));
l->mem_report = MEMORY_WARNING_REPORT;
l->mem_limit = 0;
l->L = lua_newstate(lalloc, l); // 此处创建并保存lua状态机
l->activeL = NULL;
ATOM_INIT(&l->trap , 0);
return l;
}
问题:为什么不能在init中运行脚本呢?
回答:snlua_init()中运行lua脚本时,是可能收到消息的。
snlua_init()所在运行线程持有了lua状态机,并在运行中。
收到消息时,消息处理线程也会去运行lua状态机,
那么就产生冲突了。
int
snlua_init(struct snlua *l, struct skynet_context *ctx, const char * args) {
int sz = strlen(args);
char * tmp = skynet_malloc(sz);
memcpy(tmp, args, sz);
// 设置一个临时消息监听函数launch_cb
skynet_callback(ctx, l , launch_cb);
// NULL表示获取服务的handleid的字符串形式
const char * self = skynet_command(ctx, "REG", NULL);
uint32_t handle_id = strtoul(self+1, NULL, 16);
// it must be first message
// 给自己发消息,立刻触发launch_cb,PTYPE_TAG_DONTCOPY表示tmp按指针传递
skynet_send(ctx, 0, handle_id, PTYPE_TAG_DONTCOPY,0, tmp, sz);
return 0;
}
由于snlua_init()给自己发消息,因此launch_cb被触发调用,处理消息。
static int
launch_cb(struct skynet_context * context, void *ud, int type, int session, uint32_t source , const void * msg, size_t sz) {
assert(type == 0 && session == 0);
struct snlua *l = ud;
-- 清空模块的消息监听函数
skynet_callback(context, NULL, NULL);
-- 初始化lua状态机,并运行lua状态机加载loader.lua脚本
int err = init_cb(l, context, msg, sz);
if (err) {
skynet_command(context, "EXIT", NULL);
}
return 0;
}
替换lua原生的协程唤醒函数coroutine.resume()和执行函数coroutine.wrap()
为skynet.profile的luaB_resume()和luaB_wrap(),
“skynet.profile”,貌似还能用于性能分析。
初始化完状态机,将config配置的路径设置为全局变量,并运行loader.lua脚本。
(config配置的环境变量存在一个全局的lua状态机中,
详见skynet-src/skynet_env.c中,为了存环境变量创建一个lua状态机有点浪费了)
LUA_PATH = config.lua_path or "./lualib/?.lua;./lualib/?/init.lua"
LUA_CPATH = config.lua_cpath or "./luaclib/?.so"
LUA_SERVICE = config.luaservice or "./service/?.lua"
LUA_PRELOAD = config.preload -- 无默认值
具体如下所示:
-- 假设当前脚本名为:a.lua
print "我最先被打印"
require.init(function()
print "当 require(\"a\")成功时,require返回前我将会被打印"
end)
-- skynet/require.lua
local M = {}
-- 主协程调用lua原生require,第一次加载skynet.require脚本时会保存主协程
local mainthread, ismain = coroutine.running()
-- 第一次加载非主协程会崩溃
assert(ismain, "skynet.require must initialize in main thread")
-- 协程用来保存注册f的数组,f就是通过require.init(f)注册的
local context = {
[mainthread] = {},
}
do
-- 此时的_G.require还是原生的,将原生的require通过闭包的形式存在局部变量require中
local require = _G.require
-- 已经成功加载的脚本保存在package.loaded中
local loaded = package.loaded
-- 加载中的脚本会暂存在loading中
local loading = {}
function M.require(name)
-- 已加载过的脚本直接返回
local m = loaded[name]
if m ~= nil then
return m
end
-- 如果是主协程,调用原生的require加载脚本,不过被加载脚本中的require早就被替换成skynet.require了
local co, main = coroutine.running()
if main then
return require(name)
end
-- 以下部分都是在子协程中运行
-- 以下部分都是在子协程中运行
-- 以下部分都是在子协程中运行
local filename = package.searchpath(name, package.path)
if not filename then
return require(name)
end
-- 有些不是文件形式的,而是内嵌lua当中的模块
local modfunc = loadfile(filename)
if not modfunc then
return require(name)
end
local loading_queue = loading[name]
if loading_queue then
-- 如果临时的加载中队列已存在,说明脚本还是加载中,不是加载完成形态
-- 1. 如果又回到这里,又是当前协程,那么说明形成了递归require,a脚本require(b) -> b脚本又require(a)
-- 2. 因为脚本未登记在loaded中,再次require 同个脚本,就会形成死循环。因此这里做个判断,直接报错
assert(loading_queue.co ~= co, "circular dependency")
-- Module is in the init process (require the same mod at the same time in different coroutines) , waiting.
local skynet = require "skynet"
loading_queue[#loading_queue+1] = co
-- 加载中又回到这里,不是同一个协程,可能是之前的子协程yield了,当前协程等待一会
skynet.wait(co)
-- 如果协程等待结束,该脚本还没加载好,直接报错,加载好了直接返回
local m = loaded[name]
if m == nil then
error(string.format("require %s failed", name))
end
return m
end
-- 创建临时加载队列
loading_queue = {co = co}
-- 标志脚本正在加载中
loading[name] = loading_queue
-- 保存老的init_list,因为如果调用require的当前脚本未加载完成时,可能已经调用require.init(f)了,那么init_list就不为空了
-- 得等到调用require的当前脚本加载结束后,再执行老得init_list
local old_init_list = context[co]
local init_list = {}
context[co] = init_list
-- We should call modfunc in lua, because modfunc may yield by calling M.require recursive.
local function execute_module()
-- 协程运行脚本时,脚本可能会一开始就调用yield, 那么此时modfunc就没法返回.
local m = modfunc(name, filename)
for _, f in ipairs(init_list) do
f()
end
if m == nil then
m = true
end
loaded[name] = m
end
local ok, err = xpcall(execute_module, debug.traceback)
context[co] = old_init_list
local waiting = #loading_queue
if waiting > 0 then
local skynet = require "skynet"
for i = 1, waiting do
-- 唤醒之前等在加载脚本的子协程
skynet.wakeup(loading_queue[i])
end
end
loading[name] = nil
if ok then
return loaded[name]
else
error(err)
end
end
end
-- 调用所有初始化函数,
function M.init_all()
for _, f in ipairs(context[mainthread]) do
f()
end
context[mainthread] = nil
end
-- 给当前lua文件注册初始化函数
function M.init(f)
assert(type(f) == "function")
local co = coroutine.running()
table.insert(context[co], f)
end
return M
loader.lua运行初始化相关环境后,就会运行服务脚本。
那么服务脚本中又是如何启动脚本的呢?
通过调用skynet.start(f)注册服务初始化函数。
启动服务后,服务是如何处理消息的呢?
通过调用skynet.dispatch(f)注册消息处理函数。
function skynet.init_service(start)
local function main()
-- 让skynet.timeout()通过创建的子协程去执行所有注册主协程的脚本初始化函数
skynet_require.init_all()
start() -- 所有被require的脚本的初始化函数被调用后,skynet.start(f)的f才会被执行
end
local ok, err = xpcall(main, traceback)
if not ok then
skynet.error("init service failed: " .. tostring(err))
skynet.send(".launcher","lua", "ERROR")
skynet.exit()
else
-- 重要的事说三遍
-- 回复launcher服务,告知当前服务已经初始化完成了。
-- 回复launcher服务,告知当前服务已经初始化完成了。
-- 回复launcher服务,告知当前服务已经初始化完成了。
skynet.send(".launcher","lua", "LAUNCHOK") -- 回复launcher服务,告知当前服务已经初始化完成了。
end
end
function skynet.start(start_func)
-- 设置监听消息的回调函数,此回调函数收到消息再分发,服务通过skynet.dispatch(...)监听分发的消息
c.callback(skynet.dispatch_message)
-- skynet.timeout()是一个异步执行的函数
init_thread = skynet.timeout(0, function() -- 创建一个协程来运行这个func。
-- 已经处于一个新的协程中,即便程序崩坏,该服务的主协程不受影响
skynet.init_service(start_func) -- 通过xpcall运行start_func,如果运行失败会发消息给.launcher服务
init_thread = nil
end)
end
-- skynet.timeout()是一个异步执行的函数
function skynet.timeout(ti, func)
local session = c.intcommand("TIMEOUT",ti) -- 发送0延时定时器消息,然后返回一个session
assert(session)
local co = co_create_for_timeout(func, ti) -- 创建一个协程
assert(session_id_coroutine[session] == nil)
-- 将协程登记到session_id_coroutine这个表中,等定时器消息回来,根据定时器携带的session唤醒新协程
session_id_coroutine[session] = co
return co -- for debug
end
通过调用skynet.dispatch(type, f),可以注册一个消息处理函数。
function skynet.dispatch(typename, func)
local p = proto[typename]
if func then
local ret = p.dispatch
p.dispatch = func
return ret
else
return p and p.dispatch
end
end
那么消息是怎么从C中传递这个回调中的呢?
在skynet.start(f)中,调用了c.callback(skynet.dispatch_message),
c.callback(f)将skynet.dispatch_message()函数设置为处理消息的总入口函数。
然后再由skynet.dispatch_message()将不同type的消息分发到skynet.dispatch(type, f)的回调f中。
local function raw_dispatch_message(prototype, msg, sz, session, source)
-- skynet.PTYPE_RESPONSE = 1, read skynet.h
if prototype == 1 then -- 如果是返回类型的消息,根据session查找协程,并唤醒协程
local co = session_id_coroutine[session]
if co == "BREAK" then
session_id_coroutine[session] = nil
elseif co == nil then
unknown_response(session, source, msg, sz)
else
local tag = session_coroutine_tracetag[co]
if tag then c.trace(tag, "resume") end
session_id_coroutine[session] = nil
suspend(co, coroutine_resume(co, true, msg, sz, session)) -- 唤醒协程
end
else -- 此处表示别的服务发送给当前服务的消息
local p = proto[prototype]
if p == nil then -- 协议类型未注册
if prototype == skynet.PTYPE_TRACE then
-- trace next request
trace_source[source] = c.tostring(msg,sz)
elseif session ~= 0 then
c.send(source, skynet.PTYPE_ERROR, session, "")
else
unknown_request(session, source, msg, sz, prototype)
end
return
end
local f = p.dispatch
if f then
local co = co_create(f) -- 创建一个协程
session_coroutine_id[co] = session -- 记录session
session_coroutine_address[co] = source -- 记录源服务地址
local traceflag = p.trace
if traceflag == false then
-- force off
trace_source[source] = nil
session_coroutine_tracetag[co] = false
else
local tag = trace_source[source]
if tag then
trace_source[source] = nil
c.trace(tag, "request")
session_coroutine_tracetag[co] = tag
elseif traceflag then
-- set running_thread for trace
running_thread = co
skynet.trace()
end
end
suspend(co, coroutine_resume(co, session,source, p.unpack(msg,sz))) -- 唤醒新协程处理消息
else
trace_source[source] = nil
if session ~= 0 then
c.send(source, skynet.PTYPE_ERROR, session, "") -- 给源服务发送错误消息。
else
unknown_request(session, source, msg, sz, proto[prototype].name)
end
end
end
end
function skynet.dispatch_message(...)
-- 所有类型的消息都会进这里
local succ, err = pcall(raw_dispatch_message,...) -- 调用raw_dispatch_message进行分发
while true do
-- 调用skynet.fork创建出来的协程。自己创建的协程没法被skynet框架统一管理,所以需要使用skynet.fork创建
if fork_queue.h > fork_queue.t then
-- queue is empty
fork_queue.h = 1
fork_queue.t = 0
break
end
-- pop queue
local h = fork_queue.h
local co = fork_queue[h]
-- 取出一个协程执行,然后就销毁了。skynet.fork只创建一次协程
fork_queue[h] = nil
fork_queue.h = h + 1
-- 启动运行skynet.fork(f)创建出来的协程
local fork_succ, fork_err = pcall(suspend,co,coroutine_resume(co))
if not fork_succ then
if succ then
succ = false
err = tostring(fork_err)
else
err = tostring(err) .. "\n" .. tostring(fork_err)
end
end
end
assert(succ, tostring(err))
end
实现流程:
问题:为什么lua服务启动要在bootstrap.lua脚本中进行,不在c中进行呢?
回答:
服务名:launcher
context:snlua
代码路径:service/launcher.lua
功能:
使用:
实现功能:
launcher服务用于提供一套确保创建服务初始化完成才返回的机制。
当launcher服务起来以后,skynet.newservice()接口就能正常工作了。
解释:在skynet中,如果没有launcher服务,需要调用skynet.launch去创建服务,譬如launcher服务本身就是通过skynet.launch(“snlua”, “launcher”)去创建的。
不过既然已经有skynet.launch创建服务了,为什么还要创建一个launcher服务去支持skynet.newserivce创建服务呢?
原因:服务初始化过程中,是异步的。其中至少有两个异步过程,因此如果在skynet.launch()返回之后,立刻给服务发送消息,
服务是有可能出错的。
服务是有可能出错的。
服务是有可能出错的。
第一个异步过程:snlua_init()是通过一个初始化消息来进行下一个阶段的初始化(触发launch_cb)。那么c.command()就会立刻返回。
如果此时发送一个业务消息给服务,并不是launch_cb处理消息。launch_cb优先处理第一个初始化消息。业务消息还在消息队列中。
第二个异步过程:skynet.start()
skynet.start(f)通过skynet.timeout(f)来使用定时器消息进行异步执行f。
如果f的实现如下,那么因为业务消息已经在消息队列中了,业务消息可能先于定时器执行。
那么dispatch就没有机会执行了,也就无法注册消息处理函数。
skynet.start(
function() -- function延迟执行,function未执行,导致skynet.dispatch未执行,业务消息过来时就会异常。
skynet.dispatch("lua", function(session, address, cmd)
-- 处理消息
end)
end
)
我们看看dispatch的实现,注册的协议类型存在proto集合中。
function skynet.dispatch(typename, func)
local p = proto[typename]
if func then
local ret = p.dispatch
p.dispatch = func -- 回调注册
return ret
else
return p and p.dispatch
end
end
我们再看看消息过来时是如何处理的:
function skynet.start(start_func)
-- 此时注册了消息处理函数,消息一过来,一定会调用skynet.disptach_message()
c.callback(skynet.dispatch_message)
init_thread = skynet.timeout(0, function()
skynet.init_service(start_func)
init_thread = nil
end)
end
-- skynet.dispatch_message调用raw_dispatch_message处理消息
local function raw_dispatch_message(prototype, msg, sz, session, source)
-- skynet.PTYPE_RESPONSE = 1, read skynet.h
if prototype == 1 then
...
else
local p = proto[prototype]
if p == nil then -- ****因为没有注册,所以p为nill****
if prototype == skynet.PTYPE_TRACE then
-- trace next request
trace_source[source] = c.tostring(msg,sz)
elseif session ~= 0 then
-- 如果session不为0,一般是业务消息,业务消息往回发
c.send(source, skynet.PTYPE_ERROR, session, "")
else
-- session为0,自己发给自己?总之就报错
unknown_request(session, source, msg, sz, prototype)
end
return
end
-- 以下应该也不会执行到的
...
end
end
所以我们能看到有些服务是下面这么初始化的,为了避免前面那种情况。launcher服务本身也是这么初始化的。
skynet.dispatch("lua", function (session, address, cmd, ...) end)
skynet.start(function end)
问题:那么launcher服务是怎么避免这个问题的呢?
解决:通过发消息给launcher服务创建服务,等被创建服务器真正初始化好了以后,
launcher服务才返回一个初始化完成的消息。
而创建服务的服务监听到初始化完成的消息后,再继续向下执行。
function skynet.newservice(name, ...)
-- 发送一个创建服务的消息给launcher服务
return skynet.call(".launcher", "lua" , "LAUNCH", "snlua", name, ...)
end
-- launcher.lua
local function launch_service(service, ...)
local param = table.concat({...}, " ")
-- 创建服务,当服务创建完成会返回LAUNCHOK的消息
local inst = skynet.launch(service, param)
-- 获取当前消息处理对应的session
local session = skynet.context()
-- 创建一个返回函数,response(true):返回成功给上级服务,response(false):返回失败给上级服务
local response = skynet.response()
if inst then
services[inst] = service .. " " .. param
-- 存起来,QUERY命令可以查询正在创建什么服务,我感觉没什么卵用,可能时历史遗留
instance[inst] = response
-- 存起来,QUERY命令可以查询正在创建什么服务,我感觉没什么卵用,可能历史遗留
launch_session[inst] = session
else
response(false)
return
end
return inst
end
-- 处理LAUNCH命令
function command.LAUNCH(_, service, ...)
launch_service(service, ...)
return NORET
end
-- skynet.lua
function skynet.response(pack)
pack = pack or skynet.pack
--当前消息的session
local co_session = assert(session_coroutine_id[running_thread], "no session")
session_coroutine_id[running_thread] = nil
-- 当前消息的源地址
local co_address = session_coroutine_address[running_thread]
if co_session == 0 then
-- do not response when session == 0 (send)
return function() end
end
local function response(ok, ...) -- 此处使用了闭包
if ok == "TEST" then
return unresponse[response] ~= nil
end
if not pack then
error "Can't response more than once"
end
local ret
if unresponse[response] then
if ok then
-- 异步发送 “返回成功”
ret = c.send(co_address, skynet.PTYPE_RESPONSE, co_session, pack(...))
if ret == false then
-- If the package is too large, returns false. so we should report error back
c.send(co_address, skynet.PTYPE_ERROR, co_session, "") -- 异步发送返回失败
end
else
ret = c.send(co_address, skynet.PTYPE_ERROR, co_session, "")
end
unresponse[response] = nil
ret = ret ~= nil
else
ret = false
end
pack = nil
return ret
end
unresponse[response] = co_address
return response
end
服务创建完成,返回LAUNCHOK消息
-- skynet.lua
function skynet.init_service(start)
local function main()
-- 让skynet.start()创建的子协程去执行所有注册主协程的脚本初始化函数
skynet_require.init_all()
-- 所有被require的脚本的初始化函数被调用后,skynet.start(f)的f才会被执行
start()
end
local ok, err = xpcall(main, traceback)
if not ok then
skynet.error("init service failed: " .. tostring(err))
skynet.send(".launcher","lua", "ERROR")
skynet.exit()
else
-- 回复launcher服务,告知当前服务已经初始化完成了。skynet.send是异步结束的,call是协程同步的
skynet.send(".launcher","lua", "LAUNCHOK")
end
end
function skynet.start(start_func)
c.callback(skynet.dispatch_message)
init_thread = skynet.timeout(0, function()
skynet.init_service(start_func) --初始化服务
init_thread = nil
end)
end
处理LAUNCHOK消息
-- launcher.lua
function command.LAUNCHOK(address)
-- init notice
local response = instance[address]
if response then
response(true, address) -- 发送返回消息。
instance[address] = nil
launch_session[address] = nil
end
return NORET
end
服务名:cmaster
context:snlua
代码路径:service/cmaster.lua
功能:
创建tcp服务器,收到客户端链接后,接收消息并转发。
如果我们移植到嵌入式中,不监听外部请求的话,是可以删除cmaster服务的。
使用:
实现:
cmaster服务基于socket模块所实现。
而socket模块是采用pipe+epoll
cmaster服务的初始化要从socket初始化说起。socket初始化函数是skynet_socket_init()。
当skynet框架初始化的时候,skynet-src/skynet_start.c中的skynet_start()将会调用skynet_socket_init()。
skynet_socket_init()在skynet-src/skynet_socket.c文件中实现的。
skynet_start()->skynet_socket_init()->socket_server_create()
poll_fd efd = sp_create();
ss->event_fd = efd;
sp_add(efd, fd[0], NULL);
// 读消息
ss->recvctrl_fd = fd[0];
// 发消息
ss->sendctrl_fd = fd[1];
ss->reserve_fd = dup(1); // reserve an extra fd for EMFILE
初始化我们提到我们创建了epoll套接字。
那么socket模块运行就是创建一个线程不停轮询监听epoll套接字池中的事件响应。
// 此函数位于代码:skynet-src/skynet_start.c
static void *
thread_socket(void *p) {
struct monitor * m = p;
// 给当前线程设置一个私有值THREAD_SOCKET
skynet_initthread(THREAD_SOCKET);
for (;;) {
// epoll 监听并处理消息
// r == 1表示监听到消息,并正常处理,并将消息丢到对应的服务的消息队列中。
int r = skynet_socket_poll();
if (r==0)
break;
if (r<0) {
CHECK_ABORT
continue;
}
// 唤醒N个专门处理消息的线程,线程会通过消息确定哪个服务,然后执行服务的callback函数处理消息
// 这里很奇怪,skynet_socket_poll()推送消息时也会唤醒线程,为什么这里还要再次唤醒呢?晚点确认原因
wakeup(m,0);
}
return NULL;
}
// 此函数位于代码:skynet-src/skynet_socket.c
int
skynet_socket_poll() {
struct socket_server *ss = SOCKET_SERVER;
assert(ss);
struct socket_message result;
int more = 1;
// 一旦监听到任何消息,more都会为0,从而导致当前函数返回1.
// 将监听到的消息包装为struct socket_message result;
int type = socket_server_poll(ss, &result, &more);
// forward_message()是将当前struct socket_message类型的result消息
// 转化/提炼/封装成struct skynet_socket_message,然后发送给result->opaque(服务的handleid)对应的服务
// forword_message的第一个参数作用,详情看lublib/skynet/socket.lua中的socket_message函数数组
// 第一个参数如SKYNET_SOCKET_TYPE_CONNECT,就是其数组索引。根据索引调用相应的消息处理函数,并唤醒协程。
switch (type) {
case SOCKET_EXIT:
return 0;
case SOCKET_DATA:
forward_message(SKYNET_SOCKET_TYPE_DATA, false, &result);
break;
case SOCKET_CLOSE:
forward_message(SKYNET_SOCKET_TYPE_CLOSE, false, &result);
break;
case SOCKET_OPEN:
forward_message(SKYNET_SOCKET_TYPE_CONNECT, true, &result);
break;
case SOCKET_ERR:
forward_message(SKYNET_SOCKET_TYPE_ERROR, true, &result);
break;
case SOCKET_ACCEPT:
forward_message(SKYNET_SOCKET_TYPE_ACCEPT, true, &result);
break;
case SOCKET_UDP:
forward_message(SKYNET_SOCKET_TYPE_UDP, false, &result);
break;
case SOCKET_WARNING:
forward_message(SKYNET_SOCKET_TYPE_WARNING, false, &result);
break;
default:
skynet_error(NULL, "Unknown socket message type %d.",type);
return -1;
}
if (more) {
return -1;
}
//
return 1;
}
// 此函数位于代码:skynet-src/socket_server.c
// return type
int
socket_server_poll(struct socket_server *ss, struct socket_message * result, int * more) {
for (;;) {
if (ss->checkctrl) {
// 判断是否有管道数据
if (has_cmd(ss)) {
int type = ctrl_cmd(ss, result);
if (type != -1) {
clear_closed_event(ss, result, type);
return type;
} else
continue;
} else {
// 既不是管道消息,也不是socket消息唤醒epoll。其他原因不明
ss->checkctrl = 0;
}
}
// event_index表示当前处理的事件的索引
// event_n表示事件总数
if (ss->event_index == ss->event_n) {
// epoll等待事件,sp_wait()将所有事件填充到ss->ev当中,并返回事件个数
// 此处需关注struct event结构体成员,read/write/error/eof 表示当时注册套接字的事件类型
// s成员是注册套接字时的注册数据,也就是struct socket
ss->event_n = sp_wait(ss->event_fd, ss->ev, MAX_EVENT);
ss->checkctrl = 1;
if (more) {
*more = 0;
}
ss->event_index = 0;
if (ss->event_n <= 0) {
ss->event_n = 0;
int err = errno;
if (err != EINTR) {
skynet_error(NULL, "socket-server: %s", strerror(err));
}
continue;
}
}
// 取出一个事件进行处理
struct event *e = &ss->ev[ss->event_index++];
struct socket *s = e->s;
// 如果s为空,说明不是网络消息, 应该是内部的管道消息,调用sp_add()注册管道套接字时,数据为NULL。
// 管道套接字监听代码看skynet-src/socket_server.c:socket_server_create()
if (s == NULL) {
// dispatch pipe message at beginning
continue;
}
struct socket_lock l;
socket_lock_init(s, &l);
switch (ATOM_LOAD(&s->type)) {
/**
* 1. 只有cslave的slave fd对应的struct socket的type可能是SOCKET_TYPE_CONNECTING
* 2. 当cslave服务调用socket.open()之后,创建了一个client_fd1(socket),并设置设置为非阻塞的状态(fcntl(fd, F_GETFL, 0))。
* connect可能第一时间返回-1。如果errno为EINPROGRESS则表示connect确实出错。
* errno为其他值为正常,此时将client_fd1对应的struct socket的type设置为SOCKET_TYPE_CONNECTING.
* 3. 当cmaster的listen_fd进行accept则会获取到一个client_fd2,并将client_fd2设置为SOCKET_TYPE_PACCEPT状态.
* 4. 然后cmaster.lua中拿到client_fd2后,会调用socket.start(client_fd2),将其状态由SOCKET_TYPE_PACCEPT修改为SOCKET_TYPE_CONNECTED
* 5. client_fd1是enable_write,client_fd2是enable_read.
* 6. report_connect会通过getsockopt(client_fd1, SOL_SOCKET, SO_ERROR)判断是cslave发起链接是成功还是异常,然后将结果上报
*/
case SOCKET_TYPE_CONNECTING:
return report_connect(ss, s, &l, result);
/*
* 1. 只有cmaster的listen_fd对应的struct socket的type可能是SOCKET_TYPE_LISTEN,
* 2. cmaster服务调用socket.listen()之后,listen_fd的type置为SOCKET_TYPE_PLISTEN。
* 3. cmaster服务调用socket.start()之后,fd的type会置为SOCKET_TYPE_LISTEN。
* 表示可以正式监听客户端的请求了
* 4. listen_fd被epoll唤醒,只能说明有客户端发起链接请求了。
* 因此需要accept处理链接请求。
*/
case SOCKET_TYPE_LISTEN: {
int ok = report_accept(ss, s, result);
if (ok > 0) {
return SOCKET_ACCEPT;
} if (ok < 0 ) {
return SOCKET_ERR;
}
// when ok == 0, retry
break;
}
case SOCKET_TYPE_INVALID:
skynet_error(NULL, "socket-server: invalid socket");
break;
/**
* 此处的e->type可能是SOCKET_TYPE_CONNECTED,可直接进行收发消息。
*/
default:
// read表示有消息过来,要读
if (e->read) {
int type;
if (s->protocol == PROTOCOL_TCP) {
// 读消息,Buffer没填满表示读完了,buffer填满了表示没读完
type = forward_message_tcp(ss, s, &l, result);
if (type == SOCKET_MORE) {
// 回退event_index, 继续读数据
--ss->event_index;
return SOCKET_DATA;
}
} else {
type = forward_message_udp(ss, s, &l, result);
if (type == SOCKET_UDP) {
// try read again
--ss->event_index;
// 上报数据
return SOCKET_UDP;
}
}
if (e->write && type != SOCKET_CLOSE && type != SOCKET_ERR) {
// Try to dispatch write message next step if write flag set.
e->read = false;
--ss->event_index;
}
if (type == -1)
break;
return type;
}
/**
* 如果可写(客户端的套接字一般都可写)
* 如果由数据则发送数据
* 有时候调用send时返回-1,可以等下次触发EPOLLOUT,再执行到这里,然后再发送
*/
if (e->write) {
int type = send_buffer(ss, s, &l, result);
if (type == -1)
break;
return type;
}
if (e->error) {
int error;
socklen_t len = sizeof(error);
int code = getsockopt(s->fd, SOL_SOCKET, SO_ERROR, &error, &len);
const char * err = NULL;
if (code < 0) {
err = strerror(errno);
} else if (error != 0) {
err = strerror(error);
} else {
err = "Unknown error";
}
return report_error(s, result, err);
}
// 主动关闭和被动关闭
if (e->eof) {
// For epoll (at least), FIN packets are exchanged both ways.
// See: https://stackoverflow.com/questions/52976152/tcp-when-is-epollhup-generated
int halfclose = halfclose_read(s);
force_close(ss, s, &l, result);
// halfclose为False,应该是客户端主动断开,所以返回SOCKET_CLOSE,服务端也断开。否则是服务端主动断开,不需要范围SOCKET_CLOSE
if (!halfclose) {
return SOCKET_CLOSE;
}
}
break;
}
}
}
cmaster文件依赖:
service/cmaster.lua ->
lualib/skynet/socket.lua ->
lualib/compat10/socketdriver.lua ->
lualib-src/lua-socket.c ->
skynet-src/skynet_socket.c ->
skynet-src/socket_server.c
skynet不同目录表示不同的大模块,从文件依赖,我们可以看出skynet框架内部的模块大致依赖关系:
service |
---|
lualib |
lualib-src |
skynet-src |
skynet.start(function()
local master_addr = skynet.getenv "standalone"
skynet.error("master listen socket " .. tostring(master_addr))
local fd = socket.listen(master_addr) -- 1. socket.listen()
// 对服务端的fd调用socket.start,会将fd对应的状态设置为SOCKET_TYPE_LISTEN,该状态只能监听套接字
socket.start(fd , function(id, addr) --2. socket.start()
-- 3. 回调触发
skynet.error("connect from " .. addr .. " " .. id)
// 对客户端的fd调用socket.start,会将fd对应的状态设置为SOCKET_TYPE_CONNECTED,可以进行读写
socket.start(id)
-- 4. handshake():和客户端握手,握手成功后会通知所有其他的客户端(harbor)
local ok, slave, slave_addr = pcall(handshake, id)
if ok then
-- 5. skynet.fork(): 握手成功,创建一个协程。
-- 就像前面说的,fork创建的协程会在下次收到skynet框架的消息后运行monitor_slave(仅一次)
-- monitor_slave轮询等待消息,对当前的harbor客户端提供查询和注册服务机制
skynet.fork(monitor_slave, slave, slave_addr)
else
skynet.error(string.format("disconnect fd = %d, error = %s", id, slave))
socket.close(id)
end
end)
end)
cmaster.lua:socket.listen(master_addr)
-> socket.lua:driver.listen(host, port, backlog)
-> lua-socket.c:llisten(lua_State*L)
-> skynet_socket.c:skynet_socket_listen(ctx, host, port, backlog)
-> socket_server.c:socket_server_listen(ctx, host, port, backlog)
// return -1 means failed
// or return AF_INET or AF_INET6
static int
do_bind(const char *host, int port, int protocol, int *family) {
int fd;
int status;
int reuse = 1;
struct addrinfo ai_hints;
struct addrinfo *ai_list = NULL;
char portstr[16];
if (host == NULL || host[0] == 0) {
host = "0.0.0.0"; // INADDR_ANY
}
sprintf(portstr, "%d", port);
memset( &ai_hints, 0, sizeof( ai_hints ) );
// 协议无关,AF_INET6和AF_INET4都可
ai_hints.ai_family = AF_UNSPEC;
if (protocol == IPPROTO_TCP) {
ai_hints.ai_socktype = SOCK_STREAM;
} else {
assert(protocol == IPPROTO_UDP);
ai_hints.ai_socktype = SOCK_DGRAM;
}
ai_hints.ai_protocol = protocol;
// 参考:https://www.cnblogs.com/chinacloud/archive/2011/08/11/2135141.html
// 一般ai_hints.ai_flags设置AI_PASSIVE,用于bind()
// 但是即便ai_hints.ai_flags为0,只要host和port/serv_name设置没问题,也可以进行绑定。
// 然后使用返回值ai_list取进行绑定
status = getaddrinfo( host, portstr, &ai_hints, &ai_list );
if ( status != 0 ) {
return -1;
}
*family = ai_list->ai_family;
fd = socket(*family, ai_list->ai_socktype, 0); // 创建套接字
if (fd < 0) {
goto _failed_fd;
}
if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(int))==-1) { //地址重用
goto _failed;
}
status = bind(fd, (struct sockaddr *)ai_list->ai_addr, ai_list->ai_addrlen); // 绑定地址
if (status != 0)
goto _failed;
freeaddrinfo( ai_list );
return fd;
_failed:
close(fd);
_failed_fd:
freeaddrinfo( ai_list );
return -1;
}
static int
do_listen(const char * host, int port, int backlog) {
int family = 0;
int listen_fd = do_bind(host, port, IPPROTO_TCP, &family); // 绑定端口
if (listen_fd < 0) {
return -1;
}
if (listen(listen_fd, backlog) == -1) { // 设置backlog
close(listen_fd);
return -1;
}
return listen_fd;
}
/**
* opaque:服务的handleid,监听成功后,要通过handleid
*/
int
socket_server_listen(struct socket_server *ss, uintptr_t opaque, const char * addr, int port, int backlog) {
int fd = do_listen(addr, port, backlog); // 端口绑定以及监听
if (fd < 0) {
return -1;
}
struct request_package request;
// 因为ss->slot[HASH_ID(id)],通过id可以定位一个struct socket
// 将struct socket的type设置为SOCKET_TYPE_RESERVE,是这个已经预定了,需要保留的意思。
// 后面将使用struct socket
int id = reserve_id(ss);
if (id < 0) {
close(fd);
return id;
}
request.u.listen.opaque = opaque;
request.u.listen.id = id;
request.u.listen.fd = fd; // 监听套接字
// 给管道发送消息,'L'表示
send_request(ss, &request, 'L', sizeof(request.u.listen));
return id;
}
context:snlua
代码路径:service/cslave.lua
实现功能:
context:harbor
代码路径:service-src/service_harbor.c
实现功能:
context:snlua
代码路径:service/datacenterd.lua
实现功能:
服务名:service
context:service_mgr
代码路径:service/service_mgr.lua
实现功能:
服务名:main
context:main
代码路径:examples/main.lua
实现功能:
服务名:protoloader服务
context:snlua
代码路径:examples/protoloader.lua
实现功能:
服务名:console服务
context:snlua
代码路径:service/console.lua
实现功能:
服务名:debug_console服务
context:snlua
代码路径:service/debug_console.lua
实现功能:
服务名:simpledb服务
context:snlua
代码路径:examples/simpledb.lua
实现功能:
服务名:watchdog服务
context:snlua
代码路径:examples/watchdog.lua
实现功能:
服务名:gate服务
context:snlua
代码路径:examples/gate.lua
实现功能: