基于lua-resty-redis的redis连接池

戚哲
2023-12-01

基于lua-resty-redis的redis连接池 [轮]

@author     karminski <code.karminski@outlook.com>
@version    161028:3
@link       http://blog.eth6.org/src/wheel/redis_connection_pool_with_lua_nginx_module.html

这几天用oprensty写了一些东西, 在用lua-resty-redis连接redis的时候需要一个连接池, 原本想着这东西也没多难于是就手动撸了一个, 写完了接入到系统在测试的时候发现不妙了. 不但redis连接巨慢, 而且失败率也很高. RTFM之后终于写出了一个稳定版本.

模块分为这几个部分:

-- Pseudocode
<code #1>
redis_factory = function(redis_config)
    h               = redis_config
    h.redis         = lua-resty-redis
    h.cosocket_pool = cosocket_pool config
    h.commands      = lua-resty-redis proxy commands name
    h.connect       = lua-resty-redis connect warp
    h.spawn_client  = function(): spawn redis-proxy-client -><code #2>

    self            = {}
    self.pool       = storage redis instance name
    self.construct  = function(): do your own construct 
    self.spawn      = function(): call h.spawn_client() by name and storage spawned instance into ngx.ctx
    self.destruct   = function(): close and put into cosocket connection pool 
end

<code #2>
spawn_client instance, aka redis-proxy-client = {
    name            = redis instance name
    redis_instance  = lua-resty-redis instance
    connect         = h.connect
    connect_info    = h.name
    construct       = function(): proxy lua-resty-redis all method into self
    ... (proxy function from lua-resty-redis)
}

原型部分:

  • h变量用来存储配置.

  • h.connect()函数封装了lua-resty-redis的连接方法.

  • h.spawn_client()方法用来生成包装lua-resty-redis的redis-proxy-client.

  • redis-proxy-client将lua-resty-redis内部的方法全部包装为自己内部的方法, 方法名称从h.commands指定.

redis-proxy-client中包含整个h变量的连接方法和连接参数, 该proxy构造过程将所有的proxy方法中均插入对lua-resty-redis产生的实例进行检测并重新连接的逻辑, 而且只在代理方法被调用时进行检测, 极大地缩短了redis实例初始化和使用之间的时间差, 同时又能克服与redis之间由于网络问题或设置问题导致的连接中断.

当redis_factory实例化后,返回的table包含以下几个方法:

  • self.construct()是预留的构造函数.

  • self.pool变量用来存储已经实例化的redis实例的名称.

  • redis-proxy-client.redis_instance, 真正的实例化的redis保存在redis-proxy-client.redis_instance,而redis-proxy-client则在redis_factory:spawn()过程中被保存在ngx.ctx中(必须将redis实例放置在ngx.ctx,否则会引起竞争导致命令请求失败).

  • self.destruct()用来销毁连接池中的所有redis实例, 其内部调用set_keepalive()后会立即将redis连接置为关闭状态. 并将redis连接放入ngx_lua cosocket连接池.

模块详细实现如下:

连接池代码:

--[[

    redis_factory.lua
    Redis factory method. 
    You can also find it at https://gist.github.com/karminski/33fa9149d2f95ff5d802


    @version    151019:5
    @author     karminski 
    @license    MIT

    @changelogs 
                151019:5 CLEAN test code.
                151016:4 REFACTORY spawn logic.
                151012:3 REWRITE redis proxy.
                151009:2 ADD connection mode feature.
                150922:1 INIT commit.

]]--

local redis_factory = function(h)
    
    local h           = h

    h.redis           = require('resty.redis')
    h.cosocket_pool   = {max_idel = 10000, size = 200}

    h.commands        = {
        "append",            "auth",              "bgrewriteaof",
        "bgsave",            "bitcount",          "bitop",
        "blpop",             "brpop",
        "brpoplpush",        "client",            "config",
        "dbsize",
        "debug",             "decr",              "decrby",
        "del",               "discard",           "dump",
        "echo",
        "eval",              "exec",              "exists",
        "expire",            "expireat",          "flushall",
        "flushdb",           "get",               "getbit",
        "getrange",          "getset",            "hdel",
        "hexists",           "hget",              "hgetall",
        "hincrby",           "hincrbyfloat",      "hkeys",
        "hlen",
        "hmget",             "hmset",             "hscan",
        "hset",
        "hsetnx",            "hvals",             "incr",
        "incrby",            "incrbyfloat",       "info",
        "keys",
        "lastsave",          "lindex",            "linsert",
        "llen",              "lpop",              "lpush",
        "lpushx",            "lrange",            "lrem",
        "lset",              "ltrim",             "mget",
        "migrate",
        "monitor",           "move",              "mset",
        "msetnx",            "multi",             "object",
        "persist",           "pexpire",           "pexpireat",
        "ping",              "psetex",            "psubscribe",
        "pttl",
        "publish",           "punsubscribe",      "pubsub",
        "quit",
        "randomkey",         "rename",            "renamenx",
        "restore",
        "rpop",              "rpoplpush",         "rpush",
        "rpushx",            "sadd",              "save",
        "scan",              "scard",             "script",
        "sdiff",             "sdiffstore",
        "select",            "set",               "setbit",
        "setex",             "setnx",             "setrange",
        "shutdown",          "sinter",            "sinterstore",
        "sismember",         "slaveof",           "slowlog",
        "smembers",          "smove",             "sort",
        "spop",              "srandmember",       "srem",
        "sscan",
        "strlen",            "subscribe",         "sunion",
        "sunionstore",       "sync",              "time",
        "ttl",
        "type",              "unsubscribe",       "unwatch",
        "watch",             "zadd",              "zcard",
        "zcount",            "zincrby",           "zinterstore",
        "zrange",            "zrangebyscore",     "zrank",
        "zrem",              "zremrangebyrank",   "zremrangebyscore",
        "zrevrange",         "zrevrangebyscore",  "zrevrank",
        "zscan",
        "zscore",            "zunionstore",       "evalsha",
        -- resty redis private command
        "set_keepalive",     "init_pipeline",     "commit_pipeline",      
        "array_to_hash",     "add_commands",      "get_reused_times",
    }

    -- connect
    -- @param table connect_info, e.g { host="127.0.0.1", port=6379, pass="", timeout=1000, database=0}
    -- @return boolean result
    -- @return userdata redis_instance
    h.connect = function(connect_info)
        local redis_instance = h.redis:new()
        redis_instance:set_timeout(connect_info.timeout)
        if not redis_instance:connect(connect_info.host, connect_info.port) then 
            return false, nil
        end
        if connect_info.pass ~= '' then
            redis_instance:auth(connect_info.pass)
        end
        redis_instance:select(connect_info.database)
        return true, redis_instance
    end

    -- spawn_client
    -- @param table h, include config info
    -- @param string name, redis config name
    -- @return table redis_client
    h.spawn_client = function(h, name)

        local self = {}
        
        self.name           = ""
        self.redis_instance = nil
        self.connect        = nil
        self.connect_info   = {
            host = "",   port = 0,    pass = "", 
            timeout = 0, database = 0
        }

        -- construct
        self.construct = function(_, h, name)
            -- set info
            self.name         = name
            self.connect      = h.connect
            self.connect_info = h[name]
            -- gen redis proxy client
            for _, v in pairs(h.commands) do
                self[v] = function(self, ...)
                    -- instance test and reconnect  
                    if (type(self.redis_instance) == 'userdata: NULL' or type(self.redis_instance) == 'nil') then
                        local ok
                        ok, self.redis_instance = self.connect(self.connect_info)
                        if not ok then return false end
                    end
                    -- get data
                    return self.redis_instance[v](self.redis_instance, ...)
                end
            end
            return true
        end

        -- do construct
        self:construct(h, name) 

        return self
    end     



    local self = {}

    self.pool  = {} -- redis client name pool

    -- construct
    -- you can put your own construct code here.
    self.construct = function()
        return
    end

    -- spawn
    -- @param string name, redis database serial name
    -- @return boolean result
    -- @return userdata redis
    self.spawn = function(_, name)
        if self.pool[name] == nil then
            ngx.ctx[name] = h.spawn_client(h, name) 
            self.pool[name] = true
            return true, ngx.ctx[name]
        else
            return true, ngx.ctx[name]
        end
    end

    -- destruct
    -- @return boolean allok, set_keepalive result
    self.destruct = function()
        local allok = true
        for name, _ in pairs(self.pool) do
            local ok, msg = ngx.ctx[name].redis_instance:set_keepalive(
                h.cosocket_pool.max_idel, h.cosocket_pool.size
            )
            if not ok then allok = false end 
        end
        return allok
    end

    -- do construct
    self.construct() 
        
    return self
end


return redis_factory

使用方法:

package.path  = '/home/www/bin_lua/?.lua;;./?.lua;' .. package.path

-- config example
local config = {
    redis_a = { -- your connection name 
        host = '127.0.0.1',
        port = 6379,
        pass = '',
        timeout = 200, -- watch out this value
        database = 0,
    },
    redis_b = {
        host = '127.0.0.1',
        port = 6379,
        pass = '',
        timeout = 200,
        database = 0,
    },
}

local redis_factory = require('redis_factory')(config) -- import config when construct
local ok, redis_a = redis_factory:spawn('redis_a')
local ok, redis_b = redis_factory:spawn('redis_b')
local ok = redis_a:set('test', "aaaaaaaaaaa")
if not ok then ngx.say("failed") end
local ok = redis_b:set('test', "bbbbbbbbbbb")
if not ok then ngx.say("failed") end

redis_factory:destruct() -- important, better call this method on your main function return

ngx.say("end")

注意事项:

  • 必须打开lua_code_cache,不打开的情况,性能不仅是打开情况的一半以下,而且持续并发请求的时候会 造成平均响应时间的持续上升,最终拖垮整个服务.

  • 建议按需求设置timeout和max_idel以及size,其中timeout是连接池最为致命的参数,建议该值不小一次请求的平均时间,如果timeout过小,则会造成"lua tcp socket read timed out"和"attempt to send data on a closed socket"错误,造成这种错误的原因是timeout过小,连接被redis过早释放,导致cosocket连接池无法重复利用连接.例如:

    2015/10/19 15:03:16 [error] 9117#0: *2673 lua tcp socket read timed out, client: 10.121.95.83, server: bin_lua, request: "GET /test HTTP/1.1", host: "bin_lua"
    
    2015/10/19 15:03:16 [error] 9117#0: *2673 attempt to send data on a closed socket: u:00000000402FAFC8, c:0000000000000000, ft:0 eof:0, client: 127.0.0.1, server: bin_lua, request: "GET /test HTTP/1.1", host: "bin_lua"
    
  • lua-resty-redis的实例应该存放于ngx.ctx全局变量中(单个请求生命周期的全局), 如果存放在本地变量中, 会造成竞争引发的请求错误等故障, 例如:

    2015/10/13 15:30:32 [error] 1347#0: *841234 lua entry thread aborted: runtime error: /home/www/bin_lua/redis_factory.lua:188: bad request
    
    
  • 这一点lua-resty-redis作者也在文档中有详细的说明: (引用自https://github.com/openresty/...)

    Limitations
    
    This library cannot be used in code contexts like init_by_lua, set_by_lua, log_by_lua, and header_filter_by_lua where the ngx_lua cosocket API is not available.
    
    The resty.redis object instance cannot be stored in a Lua variable at the Lua module level, because it will then be shared by all the concurrent requests handled by the same nginx worker process (see http://wiki.nginx.org/HttpLuaModule#Data_Sharing_within_an_Nginx_Worker ) and result in bad race conditions when concurrent requests are trying to use the same resty.redis instance (you would see the "bad request" or "socket busy" error to be returned from the method calls). You should always initiate resty.redis objects in function local variables or in the ngx.ctx table. These places all have their own data copies for each request.
    

以上

参考:

https://github.com/openresty/...

 类似资料: