因为做游戏服务器开发,大多数都跟脚本打交道,要么是lua,要么是python,要么是php,方便热更新的只有lua与php, php相关的游戏服务器开发,参考我另外的文章
https://blog.csdn.net/guoyilongedu/article/details/121049511
lua脚本相关的,自己也实现了一套,只不过不支持多线程,单进程的,基于libevent与luajit,上线过两款游戏项目,功能是挺全的,支持mysql,redis,定时任务相关。后面会贴出来,一起参考一下。
学skynet,因为skynet太出名了,是所有游戏服务器开发人员都绕不过去的,有必要或多或少熟悉一些,最好能自己动手实操一下,才能真正感受到skynet的魅力。
https://github.com/cloudwu/skynet
下载zip 文件,解压
cd skynet #进入skynet目录
make linux #编译
注意 因为skynet 用到c++11 相关的特性,所以gcc 必须支持C++11,所以版本必须大于4.9
初始怎么用 参考这篇文章
https://blog.csdn.net/qq_37717687/article/details/121766657
我们要关注的是游戏服务器的搭建,所以第一步 我们要关注的skynet 网络功能
之前用上面链接里面的一个echo 例子 试了一下,发现 一个问题 ,读取出来的数据包不完整 ,代码如下
local skynet = require "skynet"
local socket = require "skynet.socket"
local clients = {}
function connect(fd, addr)
--启用连接,开始等待接收客户端消息
print(fd .. " connected addr:" .. addr)
socket.start(fd)
clients[fd] = {}
--消息处理
while true do
local readdata = socket.read(fd) --利用协程实现阻塞模式
--正常接收
if readdata ~= nil then
print(fd .. " recv " .. readdata)
for k,v in pairs(clients) do --广播
socket.write(k, readdata)
end
--断开连接
else
print(fd .. " close ")
socket.close(fd)
clients[fd] = nil
end
end
end
skynet.start(function()
local listenfd = socket.listen("0.0.0.0", 8888) --监听所有ip,端口8888
socket.start(listenfd, connect) --新客户端发起连接时,conncet方法将被调用。
end)
这就很要命,研究了一下,是因为 skynet 的网络消息格式是固定的 2字节头(消息长度)+消息内容。
skynet 网络相关的是流式读取,也就是尝试读取的,一个数据包过来,可能要读好几次,至于为什么这么设计,可能是与内存池有关,内存池的分配都是按块的,所以才分批读取,后面单独再研究
单个socket每次从内核尝试读取的数据字节数为sz(第6行),这个值保存在s->p.size中,初始是MIN_READ_BUFFER(64b),当实际读到的数据等于sz时,sz扩大一倍(8-9行);如果小于sz的一半,则设置sz为原来的一半(10-11行)。
比如,客户端发了一个1kb的数据,socket线程会从内核里依次读取64b,128b,256b,512b,64b数据,总共需读取5次,即会向gateserver服务发5条消息,一个TCP包被切割成5个数据块。第5次尝试读取1024b数据,所以可能会读到其他TCP包的数据(只要客户端有发送其他数据)。接下来,客户端再发一个1kb的数据,socket线程只需从内核读取一次即可。
https://www.cnblogs.com/cnxkey/articles/15945319.html
static int
forward_message_tcp(struct socket_server *ss, struct socket *s, struct socket_lock *l, struct socket_message * result) {
int sz = s->p.size;
char * buffer = MALLOC(sz);
int n = (int)read(s->fd, buffer, sz);
if (n<0) {
FREE(buffer);
switch(errno) {
case EINTR:
break;
case AGAIN_WOULDBLOCK:
skynet_error(NULL, "socket-server: EAGAIN capture.");
break;
default:
// close when error
force_close(ss, s, l, result);
result->data = strerror(errno);
return SOCKET_ERR;
}
return -1;
}
if (n==0) {
FREE(buffer);
force_close(ss, s, l, result);
return SOCKET_CLOSE;
}
if (s->type == SOCKET_TYPE_HALFCLOSE) {
// discard recv data
FREE(buffer);
return -1;
}
stat_read(ss,s,n);
if (n == sz) {
s->p.size *= 2;
} else if (sz > MIN_READ_BUFFER && n*2 < sz) {
s->p.size /= 2;
}
result->opaque = s->opaque;
result->id = s->id;
result->ud = n;
result->data = buffer;
return SOCKET_DATA;
}
第一次读取的字节 就是 #define MIN_READ_BUFFER 64
每次读满之后,字节扩大一倍
if (n == sz) {
s->p.size *= 2;
}
了解了这里 发现 改成如下这样
local skynet = require "skynet"
local socket = require "skynet.socket"
local protobuf = require "protobuf"
local cjson = require "cjson"
local clients = {}
local logic_service = nil
-- skynet.register_protocol {
-- name = "client",
-- id = skynet.PTYPE_CLIENT,
-- unpack = skynet.tostring, --- 将C point 转换为lua 二进制字符串
-- }
function connect(fd, addr)
-- 启用连接 开始等待接收客户端消息
print(" fd = ".. fd .. " connected addr:" .. addr)
socket.start(fd)
clients[fd] = {}
-- 消息处理 不断循环接收
while true do
local readdata = socket.read(fd) -- 利用协程实现阻塞模式
-- 正常接收
if readdata ~= nil then
--local ret_data = "fd = " .. fd .. " ==> " .. string.byte(readdata, 2);
--readdata = string.unpack('>', readdata);
print("-----------> ret_data " .. readdata);
local s = readdata:byte(1) * 256 + readdata:byte(2) --数据包的长度
print(" s = " .. s)
local s1 = readdata:byte(3) * 256 + readdata:byte(4) --base64的长度
print(" s1 = " .. s1)
-- local real_data = readdata:sub(5, 100);
-- --local base64_data = string.unpack(">s2", real_data)
-- print("-----------> ret_data " .. real_data);
--readdata = skynet.tostring(readdata)
--local data_tb = protobuf.decode("cs.Person", readdata)
--print(data_tb)
--print(data_tb.name)
-- print(data_tb.id)
-- for _,v in ipairs(data_tb.phone) do
-- print("\t" .. v.number, v.type)
-- end
--print(fd .. " recv " .. readdata)
--local ret_data = "fd = " .. fd .. " ==> " .. readdata;
if logic_service ~= nil then
local msg_tb = {}
msg_tb['fd'] = fd
msg_tb['content'] = readdata;
local stringbuffer = protobuf.encode("cs.Person", {
name = "linsh",
id = 1,
email = readdata,
})
-- 发送protobuf 数据
local ret_msg_protobuf = skynet.call(logic_service, "lua", "doMsg_protobuf_data", stringbuffer)
--ret_data = "fd = " .. fd .. " ==> " .. ret_msg
local ret_tb = protobuf.decode("cs.Person", ret_msg_protobuf)
print(ret_tb.name)
print(ret_tb.id)
for _,v in ipairs(ret_tb.phone) do
print("\t" .. v.number, v.type)
end
ret_data = cjson.encode(ret_tb)
print(" ret msg from logic_service fd = " .. fd .. " ==> " .. ret_data)
end
if fd % 2 == 1 then
skynet.sleep(1000)
end
for k, v in pairs(clients) do
socket.write(k, ret_data)
end
-- 断开连接
else
print(fd .. "close")
socket.close(fd)
clients[fd] = nil
end
end
end
local CMD = {}
function CMD.broad_msg_to_user(source, msg_content)
print("=========> " .. msg_content)
for k, v in pairs(clients) do
socket.write(k, msg_content)
end
end
skynet.start(function()
logic_service = skynet.newservice("logic")
local listenfd = socket.listen("0.0.0.0", 8888) -- 监听所有ip,端口8888
socket.start(listenfd, connect) --新客户端发起连接时,conncet方法将被调用
protobuf.register_file "./examples/protos/Person.pb"
skynet.error("register:Person.pb")
skynet.dispatch("lua", function(session, source, cmd, ...)
local f = assert(CMD[cmd])
-- f(source, ...)
f(source, ...)
end)
end)
发现读取时,还是不行, 读取出来的总是不完整,了解到skynet 是分批读取,必须这些分批读取拼接起来,组成一个完整的数据包,所以需要一个netpack的中间层 来拼接数据包
--[[
协议分析:长度信息法
skynet提供的C语言编写的netpack模块,它能高效解析2字节长度信息的协议
]]
local skynet = require "skynet"
local socketdriver = require "skynet.socketdriver"
local netpack = require "skynet.netpack"
--不能同时包含socketdriver和socket,因为socket里已经register_protocol了socket类型
-- local socket = require "skynet.socket"
local queue -- message queue
local clients = {}
--解码底层传来的SOCKET类型消息
function socket_unpack(msg, size)
return netpack.filter(queue, msg, size)
end
--处理底层传来的SOCKET类型消息
function socket_dispatch(_, _, q, type, ...)
skynet.error("socket_dispatch type:" .. (type or "nil"))
queue = q
if type == "open" then
process_connect(...)
elseif type == "data" then
process_msg(...)
elseif type == "more" then
process_more(...)
elseif type == "close" then
process_close(...)
elseif type == "error" then
process_error(...)
elseif type == "warning" then
process_warning(...)
end
end
--有新连接
function process_connect(fd, addr)
skynet.error("new conn fd:" .. fd .. " addr:" .. addr)
socketdriver.start(fd)
clients[fd] = {}
clients[fd].last_time = skynet.time()
end
--关闭连接
function process_close(fd)
skynet.error("close fd:" .. fd)
if clients[fd] then
clients[fd] = nil
end
end
--发送错误
function process_error(fd, error)
skynet.error("error fd:" .. fd .. " error:" .. error)
if clients[fd] then
clients[fd] = nil
end
end
--发送警告
function process_warning(fd, size)
skynet.error("warning fd:" .. fd .. " size:" .. size)
end
--刚好收到一条完整消息
function process_msg(fd, msg, size)
local str = netpack.tostring(msg, size)
skynet.error("recv from fd:" .. fd .. " str:" .. str .. " size = " .. size)
for k, v in pairs(clients) do
skynet.error(" all fd = " .. k)
if k ~= fd then
--socket.write(k, msg_content)
socketdriver.send(k, str)
end
end
if fd % 2 == 0 then
--skynet.sleep(1000)
end
socketdriver.send(fd, str)
clients[fd].last_time = skynet.time()
end
--收到多于1条消息时
function process_more()
for fd, msg, size in netpack.pop, queue do
print("process_more = fd = " .. fd .. " size = " .. size)
skynet.fork(process_msg, fd, msg, size) --开启协程,协程保证了process_msg执行的时序性
end
end
function check_clients(timeout)
while true do
skynet.sleep(timeout)
local now_time = skynet.time()
skynet.error(" check clients now time = " .. now_time)
local gap_time = 0
for k, v in pairs(clients) do
skynet.error(" all fd = " .. k .. " last_time = " .. v.last_time)
gap_time = now_time - v.last_time
if gap_time >= 30 then
socketdriver.close(k)
--clients[k] = nil
--socketdriver.shutdown(k)
end
--socketdriver.send(k, " hearbeat fd = " .. k)
end
end
end
skynet.start(function ()
--注册SOCKET类型信息
skynet.register_protocol({
name = "socket",
id = skynet.PTYPE_SOCKET,
unpack = socket_unpack,
dispatch = socket_dispatch,
})
--注册Lua类型消息
--开启监听
local listenfd = socketdriver.listen("0.0.0.0", 8887)
socketdriver.start(listenfd)
skynet.fork(check_clients, 3000)
end)
这样就能完整收包 解包
客户端测试代码 python脚本 有粘包 也能正常解
import socket
import threading
import struct
import sys
#import Person_pb2
global recvThreadExit
def recvMsg(socket_obj):
while True:
print(" recvThreadExit = " + str(recvThreadExit))
if recvThreadExit == 1:
break
ret = str(obj.recv(1024))
print("recv msg :")
print(ret)
if len(ret) == 0 :
#obj.shutdown(socket.SHUT_RDWR)
#obj.close()
print("close ==>")
break
# person = Person_pb2.Person()
# person.name = "ayuliao"
# person.id = 6
# person.email = "xxx@xx.com"
#person.phone = "13229483229"
# person_protobuf_data = person.SerializeToString()
#print(f'person_protobuf_data: {person_protobuf_data}')
recvThreadExit = 0
obj = socket.socket()
obj.connect(('192.168.1.200', 8887))
recv_thread = threading.Thread(target=recvMsg, args=(obj,))
recv_thread.start()
while True:
#a = raw_input()
#print(a)
inp = raw_input("input string: ").strip(' ')
if inp == "quit" :
#obj.shutdown(socket.SHUT_RDWR)
obj.close()
recvThreadExit = 1
print("sssssssssssssssssseeeeeeeeeee")
break
#print(inp)
##obj.sendall(bytes(inp, encoding="utf-8"))
print("len = " + str(len(inp)) )
#fmt_str = ">1h1h1h" + str(len(inp)) + "s"
fmt_str = ">1h" + str(len(inp)) + "s"
print(fmt_str)
send_data = struct.pack(fmt_str, len(inp), bytes(inp))
print(send_data + " len = " + str(len(send_data)) )
obj.send(send_data)
#obj.send(send_data)
#obj.send(send_data)
#obj.send(send_data)
#obj.send(send_data)
if inp == "q":
break
print(" exti ==>")
sys.exit(3)
网络的一个坑点就是这个,切不可拿其他人的示例来用,要自己多测试。