Serf是一个去中心化的集群成员管理、故障检测解决方案。Kong用它做什么?先看看它怎么启动的吧:
Serf相关的配置文件主要是这些:
vagrant@precise64:~$ grep serf /usr/local/kong/kong.conf
serf_path = serf
serf_log = /usr/local/kong/logs/serf.log
serf_node_id = /usr/local/kong/serf/serf.id
serf_pid = /usr/local/kong/pids/serf.pid
serf_event = /usr/local/kong/serf/serf_event.sh
kong/cmd/utils/serf_signals.lua这个文件负责启动Serf,最关键的启动参数是这些
local args = setmetatable({
["-bind"] = kong_config.cluster_listen,
["-rpc-addr"] = kong_config.cluster_listen_rpc,
["-advertise"] = kong_config.cluster_advertise,
["-encrypt"] = kong_config.cluster_encrypt_key,
["-log-level"] = "err",
["-profile"] = kong_config.cluster_profile,
["-node"] = serf.node_name,
["-event-handler"] = "member-join,member-leave,member-failed,"
.."member-update,member-reap,user:"
..serf_event_name.."="..kong_config.serf_event
}, Serf.args_mt)
最关键的是-event-handler这个参数,这个参数对Kong节点加入、离开、故障、更新等事件定义了处理脚本kong_config.serf_event,也就是/usr/local/kong/serf/serf_event.sh这个文件,这个文件的内容:
vagrant@precise64:~$ cat /usr/local/kong/serf/serf_event.sh
#!/bin/sh
PAYLOAD=`cat` # Read from stdin
if [ "$SERF_EVENT" != "user" ]; then
PAYLOAD="{\"type\":\"${SERF_EVENT}\",\"entity\": \"${PAYLOAD}\"}"
fi
CMD="\
local http = require 'resty.http' \
local client = http.new() \
client:set_timeout(5000) \
client:connect('127.0.0.1', 8001) \
client:request { \
method = 'POST', \
path = '/cluster/events/', \
body = [=[${PAYLOAD}]=], \
headers = { \
['content-type'] = 'application/json' \
} \
}"
/usr/local/openresty/bin/resty -e "$CMD"
处理方式是POST 127.0.0.1:8001/cluster/events/,在nginx-kong.conf里面定义了/cluster/events/这个URI如何处理:
server {
server_name kong_admin;
listen 0.0.0.0:8001;
location / {
default_type application/json;
content_by_lua_block {
require('lapis').serve('kong.api')
}
}
}
终于又回到了kong.api,在kong/api/routes/cluster.lua定义了["/cluster/events/"] 的处理方法:
["/cluster/events/"] = {
POST = function(self, dao_factory, helpers)
local message_t = self.params
-- Trigger event in the node
singletons.events:publish(message_t.type, message_t)
return responses.send_HTTP_OK()
end
}
最关键的就是singletons.events:publish(message_t.type, message_t),在Kong的内部发起了一个Event,
function Events:subscribe(event_name, fn)
if fn then
self._mediator:subscribe({event_name}, function(message_t)
fn(message_t)
return nil, true -- Required to tell mediator to continue processing other events
end)
end
end
function Events:publish(event_name, message_t)
if event_name then
self._mediator:publish({string.upper(event_name)}, message_t)
end
end
Kong在启动的时候注册了Event的处理方法:
function Kong.init()
attach_hooks(events, require "kong.core.hooks")
end
local function attach_hooks(events, hooks)
for k, v in pairs(hooks) do
events:subscribe(k, v)
end
end
处理方法都在kong.core.hooks里,具体处理的事件如下:
return {
[events.TYPES.ENTITY_UPDATED] = function(message_t)
invalidate(message_t)
end,
[events.TYPES.ENTITY_DELETED] = function(message_t)
invalidate(message_t)
end,
[events.TYPES.ENTITY_CREATED] = function(message_t)
invalidate(message_t)
end,
[events.TYPES.CLUSTER_PROPAGATE] = function(message_t)
singletons.serf:event(message_t)
end,
[events.TYPES["MEMBER-JOIN"]] = function(message_t)
-- Sometimes multiple nodes are sent at once
-- 省略具体逻辑
end,
[events.TYPES["MEMBER-JOIN"]] = function(message_t)
-- Sometimes multiple nodes are sent at once
-- 省略具体逻辑
end,
[events.TYPES["MEMBER-LEAVE"]] = function(message_t)
-- Sometimes multiple nodes are sent at once
-- 省略具体逻辑
end,
[events.TYPES["MEMBER-FAILED"]] = function(message_t)
-- Sometimes multiple nodes are sent at once
-- 省略具体逻辑
end,
[events.TYPES["MEMBER-UPDATE"]] = function(message_t)
-- Sometimes multiple nodes are sent at once
-- 省略具体逻辑
end,
[events.TYPES["MEMBER-REAP"]] = function(message_t)
-- Sometimes multiple nodes are sent at once
-- 省略具体逻辑
end
end
实体的更新、删除、创建事件都由invalidate(message_t)处理,来看看具体内容吧:
-- 以下代码无任何删减改!
local function invalidate(message_t)
if message_t.collection == "consumers" then
cache.delete(cache.consumer_key(message_t.entity.id))
elseif message_t.collection == "apis" then
if message_t.entity then
cache.delete(cache.api_key(message_t.entity.id))
end
cache.delete(cache.all_apis_by_dict_key())
elseif message_t.collection == "plugins" then
-- Handles both the update and the delete
invalidate_plugin(message_t.old_entity and message_t.old_entity or message_t.entity)
end
end
local function invalidate_plugin(entity)
cache.delete(cache.plugin_key(entity.name, entity.api_id, entity.consumer_id))
end
总结一下就是清缓存,看来Kong里面核心的数据consumer、api、plugin都是做了缓存的。Serf还能做什么?集群节点的监控,比如某个节点宕机了,我们可以通知域名服务器将故障的节点去掉。