当前位置: 首页 > 工具软件 > Skynet > 使用案例 >

skynet 游戏服务器探索(1)--熟悉skynet(网络)

壤驷鸿
2023-12-01

因为做游戏服务器开发,大多数都跟脚本打交道,要么是lua,要么是python,要么是php,方便热更新的只有lua与php, php相关的游戏服务器开发,参考我另外的文章

https://blog.csdn.net/guoyilongedu/article/details/121049511

lua脚本相关的,自己也实现了一套,只不过不支持多线程,单进程的,基于libevent与luajit,上线过两款游戏项目,功能是挺全的,支持mysql,redis,定时任务相关。后面会贴出来,一起参考一下。

学skynet,因为skynet太出名了,是所有游戏服务器开发人员都绕不过去的,有必要或多或少熟悉一些,最好能自己动手实操一下,才能真正感受到skynet的魅力。

https://github.com/cloudwu/skynet

下载zip 文件,解压

cd skynet    #进入skynet目录
make linux    #编译

注意 因为skynet 用到c++11 相关的特性,所以gcc 必须支持C++11,所以版本必须大于4.9

初始怎么用 参考这篇文章

https://blog.csdn.net/qq_37717687/article/details/121766657

我们要关注的是游戏服务器的搭建,所以第一步 我们要关注的skynet 网络功能

之前用上面链接里面的一个echo 例子 试了一下,发现 一个问题 ,读取出来的数据包不完整 ,代码如下

local skynet = require "skynet"
local socket = require "skynet.socket"

local clients = {}

function connect(fd, addr)
    --启用连接,开始等待接收客户端消息
    print(fd .. " connected addr:" .. addr)
    socket.start(fd)
    clients[fd] = {}
    --消息处理
    while true do
        local readdata = socket.read(fd) --利用协程实现阻塞模式
        --正常接收
        if readdata ~= nil then
            print(fd .. " recv " .. readdata)
            for k,v in pairs(clients) do --广播
                socket.write(k, readdata)
            end
        --断开连接
        else 
            print(fd .. " close ")
            socket.close(fd)
            clients[fd] = nil
        end    
    end
end

skynet.start(function()
    local listenfd = socket.listen("0.0.0.0", 8888) --监听所有ip,端口8888
    socket.start(listenfd, connect) --新客户端发起连接时,conncet方法将被调用。
end)

这就很要命,研究了一下,是因为 skynet 的网络消息格式是固定的 2字节头(消息长度)+消息内容。

skynet 网络相关的是流式读取,也就是尝试读取的,一个数据包过来,可能要读好几次,至于为什么这么设计,可能是与内存池有关,内存池的分配都是按块的,所以才分批读取,后面单独再研究

单个socket每次从内核尝试读取的数据字节数为sz(第6行),这个值保存在s->p.size中,初始是MIN_READ_BUFFER(64b),当实际读到的数据等于sz时,sz扩大一倍(8-9行);如果小于sz的一半,则设置sz为原来的一半(10-11行)。

比如,客户端发了一个1kb的数据,socket线程会从内核里依次读取64b,128b,256b,512b,64b数据,总共需读取5次,即会向gateserver服务发5条消息,一个TCP包被切割成5个数据块。第5次尝试读取1024b数据,所以可能会读到其他TCP包的数据(只要客户端有发送其他数据)。接下来,客户端再发一个1kb的数据,socket线程只需从内核读取一次即可。

https://www.cnblogs.com/cnxkey/articles/15945319.html

static int
forward_message_tcp(struct socket_server *ss, struct socket *s, struct socket_lock *l, struct socket_message * result) {
    int sz = s->p.size;
    char * buffer = MALLOC(sz);
    int n = (int)read(s->fd, buffer, sz);
    if (n<0) {
        FREE(buffer);
        switch(errno) {
        case EINTR:
            break;
        case AGAIN_WOULDBLOCK:
            skynet_error(NULL, "socket-server: EAGAIN capture.");
            break;
        default:
            // close when error
            force_close(ss, s, l, result);
            result->data = strerror(errno);
            return SOCKET_ERR;
        }
        return -1;
    }
    if (n==0) {
        FREE(buffer);
        force_close(ss, s, l, result);
        return SOCKET_CLOSE;
    }

    if (s->type == SOCKET_TYPE_HALFCLOSE) {
        // discard recv data
        FREE(buffer);
        return -1;
    }

    stat_read(ss,s,n);

    if (n == sz) {
        s->p.size *= 2;
    } else if (sz > MIN_READ_BUFFER && n*2 < sz) {
        s->p.size /= 2;
    }

    result->opaque = s->opaque;
    result->id = s->id;
    result->ud = n;
    result->data = buffer;
    return SOCKET_DATA;
}

第一次读取的字节 就是 #define MIN_READ_BUFFER 64

每次读满之后,字节扩大一倍

 if (n == sz) {
        s->p.size *= 2;
}

了解了这里 发现 改成如下这样

local skynet = require "skynet"

local socket = require "skynet.socket"
local protobuf = require "protobuf" 
local cjson = require "cjson"


local clients = {}
local logic_service = nil

-- skynet.register_protocol {
    -- name = "client",
    -- id = skynet.PTYPE_CLIENT,    
    -- unpack = skynet.tostring,   --- 将C point 转换为lua 二进制字符串
-- }

function connect(fd, addr)
    -- 启用连接 开始等待接收客户端消息
    print(" fd = ".. fd .. " connected addr:" .. addr)
    
    socket.start(fd)
    
    clients[fd] = {}
    -- 消息处理 不断循环接收
    while true do
        local readdata = socket.read(fd) -- 利用协程实现阻塞模式
        -- 正常接收
        if readdata ~= nil then

            --local ret_data = "fd = " .. fd .. " ==> " .. string.byte(readdata, 2);
            
            
            
            --readdata = string.unpack('>', readdata);
            
            print("-----------> ret_data " .. readdata);
            
            local s = readdata:byte(1) * 256 + readdata:byte(2) --数据包的长度
            
            print(" s = " .. s)
            
            local s1 = readdata:byte(3) * 256 + readdata:byte(4) --base64的长度
            
            print(" s1 = " .. s1)
            
            -- local real_data = readdata:sub(5, 100);
            
            -- --local base64_data = string.unpack(">s2", real_data)
            
            -- print("-----------> ret_data " .. real_data);
            
            --readdata = skynet.tostring(readdata)
        
            --local data_tb = protobuf.decode("cs.Person", readdata)
            
            --print(data_tb)
            
            --print(data_tb.name)
            -- print(data_tb.id)
            -- for _,v in ipairs(data_tb.phone) do
                -- print("\t" .. v.number, v.type)
            -- end
        
            --print(fd .. " recv " .. readdata)
            
            --local ret_data = "fd = " .. fd .. " ==> " .. readdata;
            
            if logic_service ~= nil then
                local msg_tb = {}
                msg_tb['fd'] = fd
                msg_tb['content'] = readdata;
                
                local stringbuffer = protobuf.encode("cs.Person", { 
                    name = "linsh", 
                    id = 1,
                    email = readdata,
                }) 
                
                -- 发送protobuf 数据
                local ret_msg_protobuf = skynet.call(logic_service, "lua", "doMsg_protobuf_data", stringbuffer)
                --ret_data = "fd = " .. fd .. " ==> " .. ret_msg
                
                local ret_tb = protobuf.decode("cs.Person", ret_msg_protobuf)
                
                print(ret_tb.name)
                print(ret_tb.id)
                for _,v in ipairs(ret_tb.phone) do
                    print("\t" .. v.number, v.type)
                end
                
                ret_data = cjson.encode(ret_tb)
                
                print(" ret msg from logic_service fd = " .. fd .. " ==> " .. ret_data)
                
            end
            
            if fd % 2 == 1 then
                skynet.sleep(1000)
            end
            
            for k, v in pairs(clients) do
                socket.write(k, ret_data)
            end
            
        -- 断开连接
        else 
            print(fd .. "close")
            socket.close(fd)
            clients[fd] = nil
        
        end
    end
end

local CMD = {}

function CMD.broad_msg_to_user(source, msg_content)

    print("=========> " .. msg_content)
    
    for k, v in pairs(clients) do
        socket.write(k, msg_content)
    end
end

skynet.start(function()
    logic_service = skynet.newservice("logic")
    local listenfd = socket.listen("0.0.0.0", 8888) -- 监听所有ip,端口8888
    socket.start(listenfd, connect) --新客户端发起连接时,conncet方法将被调用
    
    protobuf.register_file "./examples/protos/Person.pb" 
    skynet.error("register:Person.pb") 
    
    skynet.dispatch("lua", function(session, source, cmd, ...)
        local f = assert(CMD[cmd])
        -- f(source, ...)
        f(source, ...)
    end)
    
end)

发现读取时,还是不行, 读取出来的总是不完整,了解到skynet 是分批读取,必须这些分批读取拼接起来,组成一个完整的数据包,所以需要一个netpack的中间层 来拼接数据包

--[[
协议分析:长度信息法
skynet提供的C语言编写的netpack模块,它能高效解析2字节长度信息的协议
]]


local skynet = require "skynet"
local socketdriver = require "skynet.socketdriver"
local netpack = require "skynet.netpack"

--不能同时包含socketdriver和socket,因为socket里已经register_protocol了socket类型
-- local socket = require "skynet.socket"

local queue -- message queue
local clients = {}

--解码底层传来的SOCKET类型消息
function socket_unpack(msg, size)
    return netpack.filter(queue, msg, size)
end

--处理底层传来的SOCKET类型消息
function socket_dispatch(_, _, q, type, ...)
    skynet.error("socket_dispatch type:" .. (type or "nil"))
    queue = q
    if type == "open" then
        process_connect(...)    
    elseif type == "data" then
        process_msg(...)    
    elseif type == "more" then
        process_more(...)
    elseif type == "close" then
        process_close(...)
    elseif type == "error" then
        process_error(...)
    elseif type == "warning" then
        process_warning(...)
    end    
end

--有新连接
function process_connect(fd, addr)
    skynet.error("new conn fd:" .. fd .. " addr:" .. addr)
    socketdriver.start(fd)    
    clients[fd] = {}
    clients[fd].last_time = skynet.time()
end

--关闭连接
function process_close(fd)
    skynet.error("close fd:" .. fd)
    if clients[fd] then
        clients[fd] = nil
    end
end

--发送错误
function process_error(fd, error)
    skynet.error("error fd:" .. fd .. " error:" .. error)
    if clients[fd] then
        clients[fd] = nil
    end
end

--发送警告
function process_warning(fd, size)
    skynet.error("warning fd:" .. fd .. " size:" .. size)
end

--刚好收到一条完整消息
function process_msg(fd, msg, size)
    local str = netpack.tostring(msg, size)
    skynet.error("recv from fd:" .. fd .. " str:" .. str .. " size = " .. size)
    
    for k, v in pairs(clients) do
        skynet.error(" all fd = " .. k)
        if k ~= fd then
            --socket.write(k, msg_content)
            
            socketdriver.send(k, str)
        end
    end
    
    if fd % 2 == 0 then
        --skynet.sleep(1000)
    end
    
    socketdriver.send(fd, str)
    
    clients[fd].last_time = skynet.time()
    
    
end

--收到多于1条消息时
function process_more()
    for fd, msg, size in netpack.pop, queue do
        print("process_more = fd = " .. fd .. " size = " .. size)
        skynet.fork(process_msg, fd, msg, size) --开启协程,协程保证了process_msg执行的时序性
    end
end

function check_clients(timeout)
    while true do
        skynet.sleep(timeout)
        local now_time = skynet.time()
        skynet.error(" check clients now time = " .. now_time)
        
        local gap_time = 0
        
        for k, v in pairs(clients) do
            skynet.error(" all fd = " .. k .. " last_time = " .. v.last_time)
            
            gap_time = now_time - v.last_time
            if gap_time >= 30 then
                socketdriver.close(k)
                --clients[k] = nil
                --socketdriver.shutdown(k)
            end
            
            --socketdriver.send(k, " hearbeat fd = " .. k)
        end
    end
end


skynet.start(function ()
    --注册SOCKET类型信息
    skynet.register_protocol({
        name = "socket",
        id = skynet.PTYPE_SOCKET,
        unpack = socket_unpack,
        dispatch = socket_dispatch,
    })
    --注册Lua类型消息
    --开启监听
    local listenfd = socketdriver.listen("0.0.0.0", 8887)
    socketdriver.start(listenfd)
    
    skynet.fork(check_clients, 3000)
end)

这样就能完整收包 解包

客户端测试代码 python脚本 有粘包 也能正常解

import socket
import threading
import struct
import sys
#import Person_pb2

global recvThreadExit

def recvMsg(socket_obj):
 while True:
  print(" recvThreadExit = " + str(recvThreadExit))
  if recvThreadExit == 1:
     break
  ret = str(obj.recv(1024))
  print("recv msg :")
  print(ret)
  
  if len(ret) == 0 :
    #obj.shutdown(socket.SHUT_RDWR)
    #obj.close()
    print("close ==>")
    break

# person = Person_pb2.Person()
# person.name = "ayuliao"
# person.id = 6
# person.email = "xxx@xx.com"
#person.phone = "13229483229"
# person_protobuf_data = person.SerializeToString()
#print(f'person_protobuf_data: {person_protobuf_data}')

recvThreadExit = 0

obj = socket.socket()

obj.connect(('192.168.1.200', 8887))

recv_thread = threading.Thread(target=recvMsg, args=(obj,))
recv_thread.start()

while True:

    #a = raw_input()
    #print(a)
    inp = raw_input("input string: ").strip(' ') 
    if inp == "quit" :
       #obj.shutdown(socket.SHUT_RDWR)
       obj.close()
       recvThreadExit = 1
       print("sssssssssssssssssseeeeeeeeeee")
       break
    #print(inp)
    ##obj.sendall(bytes(inp, encoding="utf-8"))
    print("len = " + str(len(inp)) )
    #fmt_str = ">1h1h1h" + str(len(inp)) + "s"
    fmt_str = ">1h" + str(len(inp)) + "s"
    print(fmt_str)
    send_data = struct.pack(fmt_str, len(inp), bytes(inp))

    print(send_data + " len = " + str(len(send_data)) )
    obj.send(send_data)
    #obj.send(send_data)
    #obj.send(send_data)
    #obj.send(send_data)
    #obj.send(send_data)
    if inp == "q":
        break

print(" exti ==>")
sys.exit(3)

网络的一个坑点就是这个,切不可拿其他人的示例来用,要自己多测试。

 类似资料: