当前位置: 首页 > 工具软件 > Lua VM > 使用案例 >

Nginx+lua+Openresty(分布式缓存)

贺季
2023-12-01

Nginx官方自带了非常多的核心模块再加上第三方的模块能够满足我们大部分的业务需要,但是业务的需求、业务的场景变化需要添加些额外的功能,如果自己去开发一个nginx模块相对来说比较笨重,我们可以使用lua脚本直接内嵌到nginx当中实现一些业务逻辑,完成一些特殊的功能需求。

ngx_luaNginx的一个模块,将Lua嵌入到Nginx中,从而可以使用Lua来编写脚本,这样就可以使用Lua编写应用脚本,部署到Nginx中运行

我们选择使用OpenResty,因为他是由Nginx核心加很多第三方模块组成,其最大的亮点是默认集成了Lua开发环境,使得Nginx可以作为一个Web Server使用。 而且OpenResty提供了大量组件如Mysql、Redis、Memcached等等,使在Nginx上开发更方便更简单,不需要我们去引入集成。

链接
负载均衡-consul
Openresty
分发层
应用层
redis集群
PHP请求

http://opm.openresty.org/ Openresty类库

lua+Redis集群

GitHub:https://github.com/steve0511/resty-redis-cluster
通过上面的地址下载扩展包,并在Nginx的配置文件中引入

lua_package_path "/path/lualib/?.lua;"; lua_package_cpath "/path/lualib/?.so;";
/path/lualib/为引入路径(安装详情通过上述地址查看)

以下为通过lua文件,建立redis集群链接

local config = {
    name = "testCluster",                   --rediscluster name
    serv_list = {                           --redis cluster node list(host and port),
        { ip = "127.0.0.1", port = 7001 },
        { ip = "127.0.0.1", port = 7002 },
        { ip = "127.0.0.1", port = 7003 },
        { ip = "127.0.0.1", port = 7004 },
        { ip = "127.0.0.1", port = 7005 },
        { ip = "127.0.0.1", port = 7006 }
    },
    keepalive_timeout = 60000,              --redis connection pool idle timeout
    keepalive_cons = 1000,                  --redis connection pool size
    connection_timout = 1000,               --timeout while connecting
    max_redirection = 5,                    --maximum retry attempts for redirection,
    auth = "pass"                           --set password while setting auth
}

local redis_cluster = require "rediscluster"
local red_c = redis_cluster:new(config)

local v, err = red_c:get("name")
if err then
    ngx.log(ngx.ERR, "err: ", err)
else
    ngx.say(v)
end

多级缓存

一般来说,缓存有两个原则。

一是越靠近用户的请求越好。比如,能用本地缓存的就不要发送 HTTP 请求,能用 CDN 缓存的就不要打到源站,能用 OpenResty 缓存的就不要查询数据库。

二是尽量使用本进程和本机的缓存解决。因为跨了进程和机器甚至机房,缓存的网络开销 就会非常大,这一点在高并发的时候会非常明显

在 OpenResty 中,缓存的设计和使用也遵循这两个原则。OpenResty 中有两个缓存的组件:shared_dict 缓存和 lru 缓存。前者只能缓存字符串对象,缓存的数据有且只有 一份,每一个 worker 都可以进行访问,所以常用于 worker 之间的数据通信。后者则可以 缓存所有的 Lua 对象,但只能在单个 worker 进程内访问,有多少个 worker,就会有多少 份缓存数据。

  lua_shared_dict  redis_cluster_slot_locks 100k;
  lua_shared_dict  redis_cluster_addr 20k;
  lua_shared_dict  my_cache 1M;
  lua_shared_dict  my_locks 100k;
  lua_shared_dict ipc_cache 1M;

我们可以在Nginx的配置文件中预先定义好共享缓存,然后可以在.lua文件中通过ngx.shared.+缓存名称的方式获取。

缓存击穿问题

数据源在 MySQL 数据库中,缓存的数据放在共享字典中,超时时间为1分钟。在这 1分钟内的时间里,所有的请求都从缓存中获取数据,MySQL 没有任何的压力。但是,一旦到达 1分钟,也就是缓存数据失效的那一刻,如果正好有大量的并发请求进来,在缓存中没有查询到结果,就要触发查询数据源的函数,那么这些请求全部都将去查询 MySQL 数据库, 直接造成数据库服务器卡顿,甚至卡死。

解决方式
  1. 获取锁,直到成功或超时。如果超时,则抛出异常,返回。如果成功,继续向下执行。
  2. 再去缓存中。如果存在值,则直接返回;如果不存在,则继续往下执行如果成功获取到锁的话,就可以保证只有一个请求去数据源更新数据,并更新到缓存中了。
  3. 查询 DB ,并更新到缓存中,返回值。

Openresty可以利用lua-resty-lock 加锁,利用的是OpenResty 自带的 resty 库.不过,在上面 lua-resty-lock 的实现中,你需要自己来处理加锁、解锁、获取过期数据、重试、异常处理等各种问题,还是相当繁琐的 我们可以使用lua-resty-mlcache,

更多的定义及使用方法请进入GitHub地址查看详细文档

mlcache工作原理大致如下:

  • L1:最近使用Lua resty lrucache的Lua VM缓存最少。提供最快的查找(如果已填充),并避免耗尽工作人员的Lua VM内存。
  • L2:lua_shared_dict内存区域由所有工作人员共享。仅当一级未命中时才访问此级别,并阻止工作人员请求三级缓存。
  • L3:去redis或存储中读取数据,并将其设置为二级缓存供其他人员使用
local key=ngx.re.match(ngx.var.request_uri,"/([0-9]+).html") //正则匹配请求的uri地址
local mlcache= require "resty.mlcache"//引用mlcache包
local lrucache   = require "resty.lrucache"//引用lrucache包
lru = lrucache.new(100)//创建一个缓存实例,100为最大存储数量

lru:set(key, "111", ttl)//设置缓存key,value,过期时间

ngx.header.content_type="text/plain"
-- L3的回调
local function fetch_shop(key,cache)
      return "id=1"
end

if type(key) == "table" then //匹配请求url的类型(table)
      local cache,err=mlcache.new("cache_name","my_cache",{ //新建mlcahe缓存
            lru_size = 500, --设置的缓存的个数
            ttl = 5, --缓存过期时间
            neg_ttl = 5, --L3返回nil的保存时间
            ipc_shm = "ipc_cache" , --用于将L2的缓存设置到L1
      })
      if not cache then
        ngx.log(ngx.ERR,"缓存创建失败",err)
      end
      cache:update(xxxx)//传播获取到的值,使其他work进程可以更新数据

      local shop_detail,err,level=cache:get(key[1],{
                lru_size = 500, --设置的缓存的个数
                ttl = 5, --缓存过期时间
                neg_ttl = 5, --L3返回nil的保存时间
                ipc_shm = "my_cache" , --用于将L2的缓存设置到L1
      },fetch_shop,key[1],cache)

      if level == 3 then
              local ok,err= cache:update()
              local ok, err = cache:set(key[1], nil, 456)
              local ok,err= cache:update()         
      end
      cache:update()
end
Nginx配置文件使用mlcache
lua_shared_dict  my_cache 1M;

content_by_lua_block {            
      ngx.header.content_type="text/plain"
      local mlcache= require "resty.mlcache"
      local cache,err=mlcache.new("cache_name","my_cache",{
          lru_size = 500, --设置的缓存的个数
           ttl = 5, --缓存过期时间
           neg_ttl = 5, --L3返回nil的保存时间
           ipc_shm = "my_cache" , --用于将L2的缓存设置到L1
      })
      local ok, err = cache:set("key_1", nil, 123)
      if not ok then
          ngx.log(ngx.ERR, "failed to set value in cache: ", err)
          return ngx.exit(500)
      end
     ngx.exit(200)
             }

更多的定义及使用方法请进入GitHub地址查看详细文档

GitHub
lock:https://github.com/openresty/lua-resty-lock,
mlcache: https://github.com/thibaultcha/lua-resty-mlcache,
lrucache: https://github.com/openresty/lua-resty-lrucache

缓存穿透

缓存穿透是指查询一个一定不存在的数据,由于缓存不命中,并且出于容错考虑,如果从存储层查不到数据则不写入缓存,这将导致这个不存在的数据每次请求都要到存储层去查询,失去了缓存的意义。

  • 危害: 对底层数据源(mysql, hbase, http接口, rpc调用等等)压力过大,有些底层数据源不具备高并发性。
  • 原因:可能是代码本身或者数据存在的问题造成的,也很有可能是一些恶意攻击、爬虫等等(因为http读接口都是开放的)
  • 如何发现:可以分别记录cache命中数,以及总调用量,如果发现空命中(cache都没有命中)较多,可能就会在缓存穿透问题。
处理方式

缓存空对象:如果一个查询返回的数据为空(不管是数据不存在,还是系统故障),我们仍然把这个空结果进行缓存(有一个比较巧妙的作法是,可以将这个不存在的key预先设定一个特定值。)但它的过期时间会很短,最长不超过五分钟。

不过相应的也会出现问题:空值做了缓存,意味着缓存层中存了更多的键,需要更多的内存空间,如果是有意攻击的话,会占用大量资源。

缓存层和存储层的数据会有一段时间窗口的不一致,可能会对业务有一定影响。例如过期时间设置为 5 分钟,如果此时存储层添加了这个数据,那此段时间就会出现缓存层和存储层数据的不一致

bloomfilter提前拦截(布隆过滤器)
布隆过滤器的原理是,当一个元素被加入集合时,通过K个Hash函数将这个元素映射成一个位数组中的K个点,把它们置为1。检索时,我们只要看看这些点是不是都是1就(大约)知道集合中有没有它了:如果这些点有任何一个0,则被检元素一定不在;如果都是1,则被检元素很可能在。

数组的容量即使再大,也是有限的。那么随着元素的增加,插入的元素就会越多,位数组中被置为1的位置因此也越多,这就会造成一种情况:当一个不在布隆过滤器中的元素,经过同样规则的哈希计算之后,得到的值在位数组中查询,有可能这些位置因为之前其它元素的操作先被置为1了 所以,有可能一个不存在布隆过滤器中的会被误判成在布隆过滤器中。这就是布隆过滤器的一个缺陷。但是,如果布隆过滤器判断某个元素不在布隆过滤器中,那么这个值就一定不在布隆过滤器中。

GitHub:https://github.com/RedisBloom/RedisBloom

布隆过滤器存在误判的情况,在Redis中有两个值决定布隆过滤器的准确率:
error_rate:允许布隆过滤器的错误率,这个值越低过滤器的位数组的大小越大,占用空间也就越大
initial_size:布隆过滤器可以储存的元素个数,当实际存储的元素个数超过这个值之后,过滤器的准确率会下降

$ redis-server --loadmodule /path/to/redisbloom.so INITIAL_SIZE 400 ERROR_RATE 0.004 
//.so为引入文件

将多级缓存,布隆过滤器结合控制

nginx.conf(分发层(distribute)nginx配置,通过consul中的存储,动态切换节点及选取负载均衡方式)


#user  nobody;
worker_processes  1;
daemon off;
events {
    worker_connections  1024;
}

http {
    include       mime.types;
    default_type  application/octet-stream;
    lua_code_cache off; #关闭代码缓存
    sendfile        on;
    #tcp_nopush     on;
    #keepalive_timeout  0;
    keepalive_timeout  65;
    #gzip  on;
    lua_package_path "/usr/local/openresty/lualib/project/common/lualib/?.lua;;";
    lua_shared_dict  load 20k;
    init_worker_by_lua_file /usr/local/openresty/lualib/project/init.lua;
    upstream upstream_server_hash {
           #ip_hash;
           #least_conn; #最少连接数
           hash $key; #商品id
           server 118.24.109.254:8002;
           upsync 118.24.109.254:8700/v1/kv/upstreams/servers upsync_timeout=6m upsync_interval=3s upsync_type=consul strong_dependency=on;
           upsync_dump_path /usr/local/openresty/nginx/conf/servers.conf; #生成配置文件
           include /usr/local/openresty/nginx/conf/servers.conf;
    }

    upstream upstream_server_round {
           #ip_hash;
           #least_conn; #最少连接数
           server 118.24.109.254:8002;
           upsync 118.24.109.254:8700/v1/kv/upstreams/servers upsync_timeout=6m upsync_interval=3s upsync_type=consul strong_dependency=on;
           upsync_dump_path /usr/local/openresty/nginx/conf/servers.conf; #生成配置文件
           include /usr/local/openresty/nginx/conf/servers.conf;
    }

    server {
        listen       80;
        if ( $request_uri ~* \/(\d+).html$ ) {
               set $key $1;
        }

        location /{
            #root   /www;
            #content_by_lua '
            #    ngx.header.content_type="text/plain";
            #    ngx.say (ngx.var.key);
            #';
           set_by_lua_file $upstream_server  /usr/local/openresty/lualib/project/set.lua;
           proxy_set_header Host $host;
           proxy_set_header X-Real-IP $remote_addr;
           proxy_set_header REMOTE-HOST $remote_addr;
           proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
           proxy_connect_timeout 30;
           proxy_send_timeout 30;
           proxy_read_timeout 60;
           proxy_pass http://$upstream_server;
           #index  index.html index.htm;
        }
       

}

init.lua(通过consul中存储的负载均衡节点信息取出)

--进程启动触发
--
 local delay = 5
 local handler
 handler = function (premature)
    local resty_consul = require('resty.consul')
    local consul = resty_consul:new({
            host            = "",
            port            = 8700,
            connect_timeout = (60*1000), -- 60s
            read_timeout    = (60*1000), -- 60s
        })
        local res, err = consul:get_key("load") --获取value值
        if not res then
           ngx.log(ngx.ERR, err)
           return
        end  
     ngx.shared.load:set('load',res.body[1].Value)
 end

if  0 == ngx.worker.id() then
     --第一次立即执行
     local ok, err = ngx.timer.at(0, handler)
     if not ok then
         ngx.log(ngx.ERR, "failed to create the timer: ", err)
         return
     end

     --第二次定时执行
      local ok, err = ngx.timer.every(delay, handler)
          if not ok then
              ngx.log(ngx.ERR, "failed to create the timer: ", err)
              return
          end
      ngx.log(ngx.ERR,"-----进程启动")
end


set.lua (根据consul中存储的负载均衡方式,动态切换)

--写业务逻辑
local flag=ngx.shared.load:get("load")
local load_blance=''
if tonumber(flag) == 1 then
     load_blance="upstream_server_round"
elseif tonumber(flag) == 2 then
     load_blance="upstream_server_conn"
else
     load_blance="upstream_server_hash"
end
return load_blance

nginx.conf(应用层(application),利用布隆过滤器,mlcache多级缓存,尽可能的保证数据在缓存中的存储,使请求发生时可以利用获取数据,并非请求到源服务器)

worker_processes  2;
daemon off; # 避免nginx在后台运行

events {
    worker_connections  1024;
}

http {
    include       mime.types;
    default_type  application/octet-stream;
    lua_code_cache off; #关闭代码缓存

    sendfile        on;

    keepalive_timeout  65;
    lua_package_path "/usr/local/openresty/lualib/project/common/lualib/?.lua;;/usr/local/openresty/lualib/project/common/resty-redis-cluster/lib/?.lua;;";
    lua_package_cpath "/usr/local/openresty/lualib/project/common/resty-redis-cluster/src/?.so;;";
    #gzip  on;
    lua_shared_dict  redis_cluster_slot_locks 100k;
    lua_shared_dict  redis_cluster_addr 20k;
    lua_shared_dict  my_cache 1M;
    lua_shared_dict  my_locks 100k;
    lua_shared_dict ipc_cache 1M;
    init_worker_by_lua_file /usr/local/openresty/lualib/project/init.lua;
    server {
        listen       80;

        location /{
            root   /usr/local/openresty/lualib/project/info;
                       content_by_lua_file /usr/local/openresty/lualib/project/cache.lua;
           #index  index.html index.htm;
        }

}

cache.lua

local key=ngx.re.match(ngx.var.request_uri,"/([0-9]+).html")
local mlcache= require "resty.mlcache"
local common= require "resty.common"
-- L3的回调
local function fetch_shop(key)   
     --利用布隆过滤器判断
     if (common.filter('shop_list',key) == 1 ) then
          local content=common.send('/index.php')
          if content==nil then
                return
          end
          return content
     end
     return
     --ngx.log(ngx.ERR,"请求到L3了",key)
end

if type(key) == "table" then
      local cache,err=mlcache.new("cache_name","my_cache",{
            lru_size = 500, --设置的缓存的个数
            ttl = 5, --缓存过期时间
            neg_ttl = 6, --L3返回nil的保存时间
            ipc_shm = "ipc_cache" --用于将L2的缓存设置到L1
      })
      if not cache then
        ngx.log(ngx.ERR,"缓存创建失败",err)
      end
  
      local shop_detail,err,level=cache:get(key[1],nil,fetch_shop,key[1])

      -- 刷新
      if  level == 3 then
         --ngx.say(shop_detail)
          math.randomseed(tostring(os.time()))
          local expire_time =math.random(1,6)
          cache:set(key[1],{ttl = expire_time},shop_detail)
      end
     -- ngx.say(shop_detail)



end

common.lua(布隆过滤器判断请求的数据是否能够找到)

local common={}
local ngx_re_split=require("ngx.re").split
local redis_cluster = require "rediscluster"

function common.filter(key,val)
    local ip_addr=ngx.shared.redis_cluster_addr:get('redis-addr')
    local ip_addr_table=ngx_re_split(ip_addr,",")
    local redis_addr={}
    for key, value in ipairs(ip_addr_table) do
        local ip_addr=ngx_re_split(value,":")
        redis_addr[key]={ip=ip_addr[1],port=ip_addr[2]}
    end
    local config = {
        name = "testCluster",                   --rediscluster name
        serv_list=redis_addr,
        keepalive_timeout = 60000,              --redis connection pool idle timeout
        keepalive_cons = 1000,                  --redis connection pool size
        connection_timout = 1000,               --timeout while connecting
        max_redirection = 5,                    --maximum retry attempts for redirection
        auth="sixstar"
        }
    local red_c = redis_cluster:new(config)
    --在redis当中嵌入lua脚本
     local res,err = red_c:eval([[
             local  key=KEYS[1]
             local  val= ARGV[1]
             local  res,err=redis.call('bf.exists',key,val)
             return res
            -- 业务逻辑
     ]],1,key,val)
     if  err then
        ngx.log(ngx.ERR,"过滤错误:",err)
        return false
     end
     return res
end


function common.send(url)
     local req_data
     local method = ngx.var.request.method
     if method == "POST" then
         req_data={method = ngx.HTTP_POST,body = ngx.req.read_body() }
     elseif method == "PUT" then
         req_data={method = ngx.HTTP_PUT, body = ngx.req.read_body() }
     else
         req_data={method = ngx.HTTP_GET}
     end
     local uri=ngx.var.request.uri
     if uri== nil then
           uri=''
     end
     ngx.say(url..uri)
     local res,err=ngx.location.capture(
         url..uri,
         req_data
     )
     if res.status == 200 then
         return res.body
     end
     return
end

return common


Redis集群当中使用hashTag让key分配在某个节点

Hash Tag 原理是:当一个 key 包含 {} 的时候,不对整个 key 做 hash,而仅对 {} 包括的字符串做 hash。 Hash Tag 可以让不同的 key 拥有相同的 hash 值,从而分配在同一个槽里;这样针对不同 key 的批量操作(mget/mset 等),以及事务、Lua 脚本等都可以支持。

local config = {
    name = "testCluster",                   --rediscluster name
    --[[serv_list = {                           --redis cluster node list(host and port),
        { ip = "118.24.109.254", port = 6391 },      
    ]]
    serv_list=redis_addr,
    keepalive_timeout = 60000,              --redis connection pool idle timeout
    keepalive_cons = 1000,                  --redis connection pool size
    connection_timout = 1000,               --timeout while connecting
    max_redirection = 5,                    --maximum retry attempts for redirection
    auth="sixstar"
    }

local redis_cluster = require "rediscluster"
local red_c = redis_cluster:new(config)

 local ok,err=red_c:eval([[
                local key=KEYS[1]
                local val=ARGV[1]
                -- local res,err=redis.call("bf.add",key,"users")

                local res,err=redis.call("bf.exists",key,"users")

                return  val

  ]],1,'{bf_1}','test') //{bf_1}存储在同一redis节点

分布式重建缓存的并发冲突问题

现在我们的逻辑是lua代码里会先去worker内存中取数据,再到redis里面取数据,如果redis取不到,就会去源站获取,如果还是取不到,就需要重建缓存了。

但是重建缓存有一个问题,因为我们的服务可能是多实例的,虽然在nginx层我们通过流量分发将请求通过id分发到了不同的nginx应用层上。那么到了接口服务层,可能多次请求访问的是不同的实例,那么可能会导致多个机器去重建读取相同的数据,然后写入缓存中,这就有了分布式重建缓存的并发冲突问题:

可能2个实例获取到的数据快照不一样,但是新数据先写入缓存,如果这个时候另外一个实例的缓存后写入,产生了覆盖。

  • 变更缓存重建更新redis之前,都需要先获取对应商品id的分布式锁
  • 拿到分布式锁之后,需要根据时间版本去比较一下,如果自己的版本新于redis中的版本,那么就更新,否则就不更新
  • 如果拿不到分布式锁,那么就等待,不断轮询等待,直到自己获取到分布式的锁

分布式锁满足条件

  • 互斥性。在任意时刻,只有一个客户端能持有锁。
  • 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。
  • 加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了,即不能误解锁。

为了满足上述条件,我们可以利用redis存储命令(set)来控制,该命令可以帮助我们只有在键不存在时进行设置,设置过期时间。

SET key value [EX seconds] [PX milliseconds] [NX|XX]

redis->set($key,$value,['nx','ex'=>10]);
  • EX seconds : 将键的过期时间设置为 seconds 秒。 执行 SET key value EX seconds 的效果等同于执行 SETEX key seconds value 。
  • PX milliseconds : 将键的过期时间设置为 milliseconds 毫秒。 执行 SET key value PX milliseconds 的效果等同于执行 PSETEX key milliseconds value 。
  • NX : 只在键不存在时, 才对键进行设置操作。 执行 SET key value NX 的效果等同于执行 SETNX key value 。
  • XX : 只在键已经存在时, 才对键进行设置操作。
Redis缓存和DB的一致性问题

因为我们采用了分布式缓存的方式来减少对服务端的请求,但当我们DB存储数据更新时,我们的redis缓存数据可能并没有更新,导致我们从缓冲中获取的数据为旧数据,这样我们的数据会不一致。

为了保证缓存更新完全,通常我们会将缓存全部清除,然后重新以最新的数据重新生成缓存。但是在清除的这段时间,有请求到来,发现缓存为空时会去DB中直接获取数据并更新缓存,但如果此时的更新正在进行,那么该请求更新的缓存很可能是尚未完成更新的数据。进而导致后续的请求获取的数据全部为旧数据。

为了防止上述的情况发生,我们要上面的清除缓存,更新数据,更新缓存按照顺序执行,这里我们用到了redis的队列。

//插入队列
local setName=KEYS[1]
local jobName=ARGV[1]
local res=redis.call("SADD",setName,jobName) //判断redis中是否已经存在,保证插入队列中的任务唯一性
if res == 1 then 
   return redis.call("LPUSH",jobName,ARGV[2])//加入队列
end
return 0

//消耗
local res=redis.call("RPOP",jobName)
if type(res) == "boolean" then 
   return 0
end
redis.call("SREM",setName,jobName)//从队列中移除
return res

将其放入队列后我们可以通过定时任务去执行消耗队列的流程,将队列中需要执行的操作取出,得到需要更新的数据信息,查询数据库并更新缓存。

发布,订阅

通过上面讲述我们知道可以通过redis的队列保证了我们的DB数据和缓存的一致性。那么什么时候应该去更新缓存中的数据呢?我们可以通过该发布,订阅模块来通知redis去更新缓存。

URL:http://redisdoc.com/pubsub/index.html

当我们进行数据更新后,利用该模块发布通知,那么订阅了该模块的应用层服务器将收到通知,然后进行缓存的更新。

init.lua(应用层初始化文件)

local sub=function (premature)
        -- 连接redis去订阅
        local redis= require "resty.redis"
        local cjson= require "cjson"
        local red=redis:new()
        red:connect("xxxx",6391)
        red:auth("xxxx")
        while 1 do
               local ok,err =red:subscribe("cacheUpdate")
               local res,err=red:read_reply() --订阅的频道获取
               ngx.log(ngx.ERR,"订阅消息---------",cjson.encode(res))
               --缓存更新
               local mlcache= require "resty.mlcache"
               local cache,err=mlcache.new("cache_name","my_cache",{
                           lru_size = 500, --设置的缓存的个数
                           ttl = 5, --缓存过期时间
                           neg_ttl = 6, --L3返回nil的保存时间
                           ipc_shm = "ipc_cache" --用于将L2的缓存设置到L1
                     })
               if not cache then
                  ngx.log(ngx.ERR,"缓存创建失败",err)
               end
               math.randomseed(tostring(os.time()))
               local expire_time =math.random(1,6)
               res=cache:set("cache",{ttl = expire_time},"xxx")
               ngx.log(ngx.ERR,"缓存更新成功吗",res)
        end
end
控制请求的数量

为了防止一次性大量的连接全部请求到服务器,我们利用令牌桶算法进行了一定的限流和降级。

令牌桶算法

  • 假设限制r/s,表示每秒会有r个令牌放入桶中,或者说每过1/r秒桶中增加一个令牌
  • 桶中最多存放b个令牌,当桶满时,新添加的令牌被丢弃或拒绝
  • 当一个n个字节大小的数据包到达,将从桶中删除n个令牌,接着数据包被发送到网络上
  • 如果桶中的令牌不足n个,则不会删除令牌,且该数据包将被限流(要么丢弃,要么在缓冲区等待)

简单来说令牌桶算法类似于我们小时候的水池放水,进水的题目。既要保证持续的有水进来,又不能让水池溢出。

local res,err = red_c:eval([[
        --  通过url判断访问是哪个服务
        local  app_name=KEYS[1]   --标识是哪个应用
        local  rareLimit=redis.call("HMGET",app_name,"max_burst","rate","curr_permits","last_second")   --从redis当中取出来
        local  max_burst=tonumber(rareLimit[1])  --令牌桶最大的容量
        local  rate=tonumber(rareLimit[2])   --每秒生成令牌的个数
        local  last_second = rareLimit[4] -- 最后一次的访问时间
        local  curr_second = ARGV[1] -- 当前时间
        local  curr_permits=tonumber(rareLimit[3]) --当前桶里剩余令牌(跟1s内的消耗有关系,请求有关系)
        local  permits    =  ARGV[2]  -- 这次请求消耗的令牌数
        local  default_curr_permits=max_burst -- 默认令牌数,默认添加10个

        --通过判断是有有最后一次的访问时间,如果满足条件,证明不是第一次获取令牌了
        if (type(last_second)) ~= "boolean" and last_second ~= nil then
               --距离我上次访问,按照速率大概产生了多个个令牌
               local reverse_permits = math.floor( ((curr_second - last_second) / 1000) * rate )  -- 当前时间-最后一次访问的时间 / 1000 * 速率
               -- 如果访问的时间较短,允许突发数量
               local expect_curr_permits= reverse_permits + curr_permits
               -- 最终能够使用的令牌,不能超过最大令牌数
               default_curr_permits=math.min(expect_curr_permits,max_burst)
        else
              --记录下访问时间,并且减去消耗令牌数
             local res=redis.call("HMSET",app_name,"last_second",curr_second,"curr_permits",default_curr_permits - permits)
             if res =="ok" then
               return 1
             end
        end

        -- 当前能够使用的令牌 - 请求消耗的令牌 > 0,就能够成功获取令牌
        if (default_curr_permits - permits >= 0 ) then
             --记录下访问时间,并且减去消耗令牌数
             redis.call("HMSET",app_name,"last_second",curr_second,"curr_permits",default_curr_permits - permits)
             return 1
        else
             --令牌不够,只需要重置下桶里的令牌数,否则是负值导致,如果一直刷新将导致一直无法请求(看需求)
            redis.call("HMSET",app_name,"curr_permits",default_curr_permits)
            return 0
        end
        -- 业务逻辑
 ]],1,key,string.format("%.3f",ngx.now() * 1000 ),1)

总结

  1. 请求进入时,分发层利用令牌桶算法对请求的数据进行限流,降级。根据consul中设置的负载均衡节点及方式将请求分发到应用层
  2. 利用mlcache设置多级缓存。优先利用布隆过滤器防止大量空数据的请求,如存在进入redis进行缓存的获取。若不存在,将请求更新缓存的请求添加至更新队列,在保证添加请求唯一的情况下,由消耗队列统一获取进行更新。
  3. 后台DB数据有更新时,利用发布,订阅机制,通知应用层的各个服务器进行缓存更新
 类似资料: