Nginx官方自带了非常多的核心模块再加上第三方的模块能够满足我们大部分的业务需要,但是业务的需求、业务的场景变化需要添加些额外的功能,如果自己去开发一个nginx模块相对来说比较笨重,我们可以使用lua脚本直接内嵌到nginx当中实现一些业务逻辑,完成一些特殊的功能需求。
ngx_lua是Nginx的一个模块,将Lua嵌入到Nginx中,从而可以使用Lua来编写脚本,这样就可以使用Lua编写应用脚本,部署到Nginx中运行
我们选择使用OpenResty,因为他是由Nginx核心加很多第三方模块组成,其最大的亮点是默认集成了Lua开发环境,使得Nginx可以作为一个Web Server使用。 而且OpenResty提供了大量组件如Mysql、Redis、Memcached等等,使在Nginx上开发更方便更简单,不需要我们去引入集成。
GitHub:https://github.com/steve0511/resty-redis-cluster
通过上面的地址下载扩展包,并在Nginx的配置文件中引入
lua_package_path "/path/lualib/?.lua;"; lua_package_cpath "/path/lualib/?.so;";
/path/lualib/为引入路径(安装详情通过上述地址查看)
以下为通过lua文件,建立redis集群链接
local config = {
name = "testCluster", --rediscluster name
serv_list = { --redis cluster node list(host and port),
{ ip = "127.0.0.1", port = 7001 },
{ ip = "127.0.0.1", port = 7002 },
{ ip = "127.0.0.1", port = 7003 },
{ ip = "127.0.0.1", port = 7004 },
{ ip = "127.0.0.1", port = 7005 },
{ ip = "127.0.0.1", port = 7006 }
},
keepalive_timeout = 60000, --redis connection pool idle timeout
keepalive_cons = 1000, --redis connection pool size
connection_timout = 1000, --timeout while connecting
max_redirection = 5, --maximum retry attempts for redirection,
auth = "pass" --set password while setting auth
}
local redis_cluster = require "rediscluster"
local red_c = redis_cluster:new(config)
local v, err = red_c:get("name")
if err then
ngx.log(ngx.ERR, "err: ", err)
else
ngx.say(v)
end
一般来说,缓存有两个原则。
一是越靠近用户的请求越好。比如,能用本地缓存的就不要发送 HTTP 请求,能用 CDN 缓存的就不要打到源站,能用 OpenResty 缓存的就不要查询数据库。
二是尽量使用本进程和本机的缓存解决。因为跨了进程和机器甚至机房,缓存的网络开销 就会非常大,这一点在高并发的时候会非常明显
在 OpenResty 中,缓存的设计和使用也遵循这两个原则。OpenResty 中有两个缓存的组件:shared_dict 缓存和 lru 缓存。前者只能缓存字符串对象,缓存的数据有且只有 一份,每一个 worker 都可以进行访问,所以常用于 worker 之间的数据通信。后者则可以 缓存所有的 Lua 对象,但只能在单个 worker 进程内访问,有多少个 worker,就会有多少 份缓存数据。
lua_shared_dict redis_cluster_slot_locks 100k;
lua_shared_dict redis_cluster_addr 20k;
lua_shared_dict my_cache 1M;
lua_shared_dict my_locks 100k;
lua_shared_dict ipc_cache 1M;
我们可以在Nginx的配置文件中预先定义好共享缓存,然后可以在.lua文件中通过ngx.shared.+缓存名称的方式获取。
数据源在 MySQL 数据库中,缓存的数据放在共享字典中,超时时间为1分钟。在这 1分钟内的时间里,所有的请求都从缓存中获取数据,MySQL 没有任何的压力。但是,一旦到达 1分钟,也就是缓存数据失效的那一刻,如果正好有大量的并发请求进来,在缓存中没有查询到结果,就要触发查询数据源的函数,那么这些请求全部都将去查询 MySQL 数据库, 直接造成数据库服务器卡顿,甚至卡死。
Openresty可以利用lua-resty-lock 加锁,利用的是OpenResty 自带的 resty 库.不过,在上面 lua-resty-lock 的实现中,你需要自己来处理加锁、解锁、获取过期数据、重试、异常处理等各种问题,还是相当繁琐的 我们可以使用lua-resty-mlcache,
更多的定义及使用方法请进入GitHub地址查看详细文档
mlcache工作原理大致如下:
local key=ngx.re.match(ngx.var.request_uri,"/([0-9]+).html") //正则匹配请求的uri地址
local mlcache= require "resty.mlcache"//引用mlcache包
local lrucache = require "resty.lrucache"//引用lrucache包
lru = lrucache.new(100)//创建一个缓存实例,100为最大存储数量
lru:set(key, "111", ttl)//设置缓存key,value,过期时间
ngx.header.content_type="text/plain"
-- L3的回调
local function fetch_shop(key,cache)
return "id=1"
end
if type(key) == "table" then //匹配请求url的类型(table)
local cache,err=mlcache.new("cache_name","my_cache",{ //新建mlcahe缓存
lru_size = 500, --设置的缓存的个数
ttl = 5, --缓存过期时间
neg_ttl = 5, --L3返回nil的保存时间
ipc_shm = "ipc_cache" , --用于将L2的缓存设置到L1
})
if not cache then
ngx.log(ngx.ERR,"缓存创建失败",err)
end
cache:update(xxxx)//传播获取到的值,使其他work进程可以更新数据
local shop_detail,err,level=cache:get(key[1],{
lru_size = 500, --设置的缓存的个数
ttl = 5, --缓存过期时间
neg_ttl = 5, --L3返回nil的保存时间
ipc_shm = "my_cache" , --用于将L2的缓存设置到L1
},fetch_shop,key[1],cache)
if level == 3 then
local ok,err= cache:update()
local ok, err = cache:set(key[1], nil, 456)
local ok,err= cache:update()
end
cache:update()
end
lua_shared_dict my_cache 1M;
content_by_lua_block {
ngx.header.content_type="text/plain"
local mlcache= require "resty.mlcache"
local cache,err=mlcache.new("cache_name","my_cache",{
lru_size = 500, --设置的缓存的个数
ttl = 5, --缓存过期时间
neg_ttl = 5, --L3返回nil的保存时间
ipc_shm = "my_cache" , --用于将L2的缓存设置到L1
})
local ok, err = cache:set("key_1", nil, 123)
if not ok then
ngx.log(ngx.ERR, "failed to set value in cache: ", err)
return ngx.exit(500)
end
ngx.exit(200)
}
更多的定义及使用方法请进入GitHub地址查看详细文档
GitHub:
lock:https://github.com/openresty/lua-resty-lock,
mlcache: https://github.com/thibaultcha/lua-resty-mlcache,
lrucache: https://github.com/openresty/lua-resty-lrucache
缓存穿透是指查询一个一定不存在的数据,由于缓存不命中,并且出于容错考虑,如果从存储层查不到数据则不写入缓存,这将导致这个不存在的数据每次请求都要到存储层去查询,失去了缓存的意义。
缓存空对象:如果一个查询返回的数据为空(不管是数据不存在,还是系统故障),我们仍然把这个空结果进行缓存(有一个比较巧妙的作法是,可以将这个不存在的key预先设定一个特定值。)但它的过期时间会很短,最长不超过五分钟。
不过相应的也会出现问题:空值做了缓存,意味着缓存层中存了更多的键,需要更多的内存空间,如果是有意攻击的话,会占用大量资源。
缓存层和存储层的数据会有一段时间窗口的不一致,可能会对业务有一定影响。例如过期时间设置为 5 分钟,如果此时存储层添加了这个数据,那此段时间就会出现缓存层和存储层数据的不一致
bloomfilter提前拦截(布隆过滤器)
布隆过滤器的原理是,当一个元素被加入集合时,通过K个Hash函数将这个元素映射成一个位数组中的K个点,把它们置为1。检索时,我们只要看看这些点是不是都是1就(大约)知道集合中有没有它了:如果这些点有任何一个0,则被检元素一定不在;如果都是1,则被检元素很可能在。
数组的容量即使再大,也是有限的。那么随着元素的增加,插入的元素就会越多,位数组中被置为1的位置因此也越多,这就会造成一种情况:当一个不在布隆过滤器中的元素,经过同样规则的哈希计算之后,得到的值在位数组中查询,有可能这些位置因为之前其它元素的操作先被置为1了 所以,有可能一个不存在布隆过滤器中的会被误判成在布隆过滤器中。这就是布隆过滤器的一个缺陷。但是,如果布隆过滤器判断某个元素不在布隆过滤器中,那么这个值就一定不在布隆过滤器中。
GitHub:https://github.com/RedisBloom/RedisBloom
布隆过滤器存在误判的情况,在Redis中有两个值决定布隆过滤器的准确率:
error_rate:允许布隆过滤器的错误率,这个值越低过滤器的位数组的大小越大,占用空间也就越大
initial_size:布隆过滤器可以储存的元素个数,当实际存储的元素个数超过这个值之后,过滤器的准确率会下降
$ redis-server --loadmodule /path/to/redisbloom.so INITIAL_SIZE 400 ERROR_RATE 0.004
//.so为引入文件
nginx.conf(分发层(distribute)nginx配置,通过consul中的存储,动态切换节点及选取负载均衡方式)
#user nobody;
worker_processes 1;
daemon off;
events {
worker_connections 1024;
}
http {
include mime.types;
default_type application/octet-stream;
lua_code_cache off; #关闭代码缓存
sendfile on;
#tcp_nopush on;
#keepalive_timeout 0;
keepalive_timeout 65;
#gzip on;
lua_package_path "/usr/local/openresty/lualib/project/common/lualib/?.lua;;";
lua_shared_dict load 20k;
init_worker_by_lua_file /usr/local/openresty/lualib/project/init.lua;
upstream upstream_server_hash {
#ip_hash;
#least_conn; #最少连接数
hash $key; #商品id
server 118.24.109.254:8002;
upsync 118.24.109.254:8700/v1/kv/upstreams/servers upsync_timeout=6m upsync_interval=3s upsync_type=consul strong_dependency=on;
upsync_dump_path /usr/local/openresty/nginx/conf/servers.conf; #生成配置文件
include /usr/local/openresty/nginx/conf/servers.conf;
}
upstream upstream_server_round {
#ip_hash;
#least_conn; #最少连接数
server 118.24.109.254:8002;
upsync 118.24.109.254:8700/v1/kv/upstreams/servers upsync_timeout=6m upsync_interval=3s upsync_type=consul strong_dependency=on;
upsync_dump_path /usr/local/openresty/nginx/conf/servers.conf; #生成配置文件
include /usr/local/openresty/nginx/conf/servers.conf;
}
server {
listen 80;
if ( $request_uri ~* \/(\d+).html$ ) {
set $key $1;
}
location /{
#root /www;
#content_by_lua '
# ngx.header.content_type="text/plain";
# ngx.say (ngx.var.key);
#';
set_by_lua_file $upstream_server /usr/local/openresty/lualib/project/set.lua;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header REMOTE-HOST $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_connect_timeout 30;
proxy_send_timeout 30;
proxy_read_timeout 60;
proxy_pass http://$upstream_server;
#index index.html index.htm;
}
}
init.lua(通过consul中存储的负载均衡节点信息取出)
--进程启动触发
--
local delay = 5
local handler
handler = function (premature)
local resty_consul = require('resty.consul')
local consul = resty_consul:new({
host = "",
port = 8700,
connect_timeout = (60*1000), -- 60s
read_timeout = (60*1000), -- 60s
})
local res, err = consul:get_key("load") --获取value值
if not res then
ngx.log(ngx.ERR, err)
return
end
ngx.shared.load:set('load',res.body[1].Value)
end
if 0 == ngx.worker.id() then
--第一次立即执行
local ok, err = ngx.timer.at(0, handler)
if not ok then
ngx.log(ngx.ERR, "failed to create the timer: ", err)
return
end
--第二次定时执行
local ok, err = ngx.timer.every(delay, handler)
if not ok then
ngx.log(ngx.ERR, "failed to create the timer: ", err)
return
end
ngx.log(ngx.ERR,"-----进程启动")
end
set.lua (根据consul中存储的负载均衡方式,动态切换)
--写业务逻辑
local flag=ngx.shared.load:get("load")
local load_blance=''
if tonumber(flag) == 1 then
load_blance="upstream_server_round"
elseif tonumber(flag) == 2 then
load_blance="upstream_server_conn"
else
load_blance="upstream_server_hash"
end
return load_blance
nginx.conf(应用层(application),利用布隆过滤器,mlcache多级缓存,尽可能的保证数据在缓存中的存储,使请求发生时可以利用获取数据,并非请求到源服务器)
worker_processes 2;
daemon off; # 避免nginx在后台运行
events {
worker_connections 1024;
}
http {
include mime.types;
default_type application/octet-stream;
lua_code_cache off; #关闭代码缓存
sendfile on;
keepalive_timeout 65;
lua_package_path "/usr/local/openresty/lualib/project/common/lualib/?.lua;;/usr/local/openresty/lualib/project/common/resty-redis-cluster/lib/?.lua;;";
lua_package_cpath "/usr/local/openresty/lualib/project/common/resty-redis-cluster/src/?.so;;";
#gzip on;
lua_shared_dict redis_cluster_slot_locks 100k;
lua_shared_dict redis_cluster_addr 20k;
lua_shared_dict my_cache 1M;
lua_shared_dict my_locks 100k;
lua_shared_dict ipc_cache 1M;
init_worker_by_lua_file /usr/local/openresty/lualib/project/init.lua;
server {
listen 80;
location /{
root /usr/local/openresty/lualib/project/info;
content_by_lua_file /usr/local/openresty/lualib/project/cache.lua;
#index index.html index.htm;
}
}
cache.lua
local key=ngx.re.match(ngx.var.request_uri,"/([0-9]+).html")
local mlcache= require "resty.mlcache"
local common= require "resty.common"
-- L3的回调
local function fetch_shop(key)
--利用布隆过滤器判断
if (common.filter('shop_list',key) == 1 ) then
local content=common.send('/index.php')
if content==nil then
return
end
return content
end
return
--ngx.log(ngx.ERR,"请求到L3了",key)
end
if type(key) == "table" then
local cache,err=mlcache.new("cache_name","my_cache",{
lru_size = 500, --设置的缓存的个数
ttl = 5, --缓存过期时间
neg_ttl = 6, --L3返回nil的保存时间
ipc_shm = "ipc_cache" --用于将L2的缓存设置到L1
})
if not cache then
ngx.log(ngx.ERR,"缓存创建失败",err)
end
local shop_detail,err,level=cache:get(key[1],nil,fetch_shop,key[1])
-- 刷新
if level == 3 then
--ngx.say(shop_detail)
math.randomseed(tostring(os.time()))
local expire_time =math.random(1,6)
cache:set(key[1],{ttl = expire_time},shop_detail)
end
-- ngx.say(shop_detail)
end
common.lua(布隆过滤器判断请求的数据是否能够找到)
local common={}
local ngx_re_split=require("ngx.re").split
local redis_cluster = require "rediscluster"
function common.filter(key,val)
local ip_addr=ngx.shared.redis_cluster_addr:get('redis-addr')
local ip_addr_table=ngx_re_split(ip_addr,",")
local redis_addr={}
for key, value in ipairs(ip_addr_table) do
local ip_addr=ngx_re_split(value,":")
redis_addr[key]={ip=ip_addr[1],port=ip_addr[2]}
end
local config = {
name = "testCluster", --rediscluster name
serv_list=redis_addr,
keepalive_timeout = 60000, --redis connection pool idle timeout
keepalive_cons = 1000, --redis connection pool size
connection_timout = 1000, --timeout while connecting
max_redirection = 5, --maximum retry attempts for redirection
auth="sixstar"
}
local red_c = redis_cluster:new(config)
--在redis当中嵌入lua脚本
local res,err = red_c:eval([[
local key=KEYS[1]
local val= ARGV[1]
local res,err=redis.call('bf.exists',key,val)
return res
-- 业务逻辑
]],1,key,val)
if err then
ngx.log(ngx.ERR,"过滤错误:",err)
return false
end
return res
end
function common.send(url)
local req_data
local method = ngx.var.request.method
if method == "POST" then
req_data={method = ngx.HTTP_POST,body = ngx.req.read_body() }
elseif method == "PUT" then
req_data={method = ngx.HTTP_PUT, body = ngx.req.read_body() }
else
req_data={method = ngx.HTTP_GET}
end
local uri=ngx.var.request.uri
if uri== nil then
uri=''
end
ngx.say(url..uri)
local res,err=ngx.location.capture(
url..uri,
req_data
)
if res.status == 200 then
return res.body
end
return
end
return common
Hash Tag 原理是:当一个 key 包含 {} 的时候,不对整个 key 做 hash,而仅对 {} 包括的字符串做 hash。 Hash Tag 可以让不同的 key 拥有相同的 hash 值,从而分配在同一个槽里;这样针对不同 key 的批量操作(mget/mset 等),以及事务、Lua 脚本等都可以支持。
local config = {
name = "testCluster", --rediscluster name
--[[serv_list = { --redis cluster node list(host and port),
{ ip = "118.24.109.254", port = 6391 },
]]
serv_list=redis_addr,
keepalive_timeout = 60000, --redis connection pool idle timeout
keepalive_cons = 1000, --redis connection pool size
connection_timout = 1000, --timeout while connecting
max_redirection = 5, --maximum retry attempts for redirection
auth="sixstar"
}
local redis_cluster = require "rediscluster"
local red_c = redis_cluster:new(config)
local ok,err=red_c:eval([[
local key=KEYS[1]
local val=ARGV[1]
-- local res,err=redis.call("bf.add",key,"users")
local res,err=redis.call("bf.exists",key,"users")
return val
]],1,'{bf_1}','test') //{bf_1}存储在同一redis节点
现在我们的逻辑是lua代码里会先去worker内存中取数据,再到redis里面取数据,如果redis取不到,就会去源站获取,如果还是取不到,就需要重建缓存了。
但是重建缓存有一个问题,因为我们的服务可能是多实例的,虽然在nginx层我们通过流量分发将请求通过id分发到了不同的nginx应用层上。那么到了接口服务层,可能多次请求访问的是不同的实例,那么可能会导致多个机器去重建读取相同的数据,然后写入缓存中,这就有了分布式重建缓存的并发冲突问题:
可能2个实例获取到的数据快照不一样,但是新数据先写入缓存,如果这个时候另外一个实例的缓存后写入,产生了覆盖。
分布式锁满足条件
为了满足上述条件,我们可以利用redis存储命令(set)来控制,该命令可以帮助我们只有在键不存在时进行设置,设置过期时间。
SET key value [EX seconds] [PX milliseconds] [NX|XX]
redis->set($key,$value,['nx','ex'=>10]);
因为我们采用了分布式缓存的方式来减少对服务端的请求,但当我们DB存储数据更新时,我们的redis缓存数据可能并没有更新,导致我们从缓冲中获取的数据为旧数据,这样我们的数据会不一致。
为了保证缓存更新完全,通常我们会将缓存全部清除,然后重新以最新的数据重新生成缓存。但是在清除的这段时间,有请求到来,发现缓存为空时会去DB中直接获取数据并更新缓存,但如果此时的更新正在进行,那么该请求更新的缓存很可能是尚未完成更新的数据。进而导致后续的请求获取的数据全部为旧数据。
为了防止上述的情况发生,我们要上面的清除缓存,更新数据,更新缓存按照顺序执行,这里我们用到了redis的队列。
//插入队列
local setName=KEYS[1]
local jobName=ARGV[1]
local res=redis.call("SADD",setName,jobName) //判断redis中是否已经存在,保证插入队列中的任务唯一性
if res == 1 then
return redis.call("LPUSH",jobName,ARGV[2])//加入队列
end
return 0
//消耗
local res=redis.call("RPOP",jobName)
if type(res) == "boolean" then
return 0
end
redis.call("SREM",setName,jobName)//从队列中移除
return res
将其放入队列后我们可以通过定时任务去执行消耗队列的流程,将队列中需要执行的操作取出,得到需要更新的数据信息,查询数据库并更新缓存。
通过上面讲述我们知道可以通过redis的队列保证了我们的DB数据和缓存的一致性。那么什么时候应该去更新缓存中的数据呢?我们可以通过该发布,订阅模块来通知redis去更新缓存。
URL:http://redisdoc.com/pubsub/index.html
当我们进行数据更新后,利用该模块发布通知,那么订阅了该模块的应用层服务器将收到通知,然后进行缓存的更新。
init.lua(应用层初始化文件)
local sub=function (premature)
-- 连接redis去订阅
local redis= require "resty.redis"
local cjson= require "cjson"
local red=redis:new()
red:connect("xxxx",6391)
red:auth("xxxx")
while 1 do
local ok,err =red:subscribe("cacheUpdate")
local res,err=red:read_reply() --订阅的频道获取
ngx.log(ngx.ERR,"订阅消息---------",cjson.encode(res))
--缓存更新
local mlcache= require "resty.mlcache"
local cache,err=mlcache.new("cache_name","my_cache",{
lru_size = 500, --设置的缓存的个数
ttl = 5, --缓存过期时间
neg_ttl = 6, --L3返回nil的保存时间
ipc_shm = "ipc_cache" --用于将L2的缓存设置到L1
})
if not cache then
ngx.log(ngx.ERR,"缓存创建失败",err)
end
math.randomseed(tostring(os.time()))
local expire_time =math.random(1,6)
res=cache:set("cache",{ttl = expire_time},"xxx")
ngx.log(ngx.ERR,"缓存更新成功吗",res)
end
end
为了防止一次性大量的连接全部请求到服务器,我们利用令牌桶算法进行了一定的限流和降级。
令牌桶算法
简单来说令牌桶算法类似于我们小时候的水池放水,进水的题目。既要保证持续的有水进来,又不能让水池溢出。
local res,err = red_c:eval([[
-- 通过url判断访问是哪个服务
local app_name=KEYS[1] --标识是哪个应用
local rareLimit=redis.call("HMGET",app_name,"max_burst","rate","curr_permits","last_second") --从redis当中取出来
local max_burst=tonumber(rareLimit[1]) --令牌桶最大的容量
local rate=tonumber(rareLimit[2]) --每秒生成令牌的个数
local last_second = rareLimit[4] -- 最后一次的访问时间
local curr_second = ARGV[1] -- 当前时间
local curr_permits=tonumber(rareLimit[3]) --当前桶里剩余令牌(跟1s内的消耗有关系,请求有关系)
local permits = ARGV[2] -- 这次请求消耗的令牌数
local default_curr_permits=max_burst -- 默认令牌数,默认添加10个
--通过判断是有有最后一次的访问时间,如果满足条件,证明不是第一次获取令牌了
if (type(last_second)) ~= "boolean" and last_second ~= nil then
--距离我上次访问,按照速率大概产生了多个个令牌
local reverse_permits = math.floor( ((curr_second - last_second) / 1000) * rate ) -- 当前时间-最后一次访问的时间 / 1000 * 速率
-- 如果访问的时间较短,允许突发数量
local expect_curr_permits= reverse_permits + curr_permits
-- 最终能够使用的令牌,不能超过最大令牌数
default_curr_permits=math.min(expect_curr_permits,max_burst)
else
--记录下访问时间,并且减去消耗令牌数
local res=redis.call("HMSET",app_name,"last_second",curr_second,"curr_permits",default_curr_permits - permits)
if res =="ok" then
return 1
end
end
-- 当前能够使用的令牌 - 请求消耗的令牌 > 0,就能够成功获取令牌
if (default_curr_permits - permits >= 0 ) then
--记录下访问时间,并且减去消耗令牌数
redis.call("HMSET",app_name,"last_second",curr_second,"curr_permits",default_curr_permits - permits)
return 1
else
--令牌不够,只需要重置下桶里的令牌数,否则是负值导致,如果一直刷新将导致一直无法请求(看需求)
redis.call("HMSET",app_name,"curr_permits",default_curr_permits)
return 0
end
-- 业务逻辑
]],1,key,string.format("%.3f",ngx.now() * 1000 ),1)