当前位置: 首页 > 工具软件 > Serf > 使用案例 >

Kong为什么需要Serf

秦鸿羽
2023-12-01

Serf是一个去中心化的集群成员管理、故障检测解决方案。Kong用它做什么?先看看它怎么启动的吧:

Serf相关的配置文件主要是这些:

vagrant@precise64:~$ grep serf /usr/local/kong/kong.conf     
serf_path = serf
serf_log = /usr/local/kong/logs/serf.log
serf_node_id = /usr/local/kong/serf/serf.id
serf_pid = /usr/local/kong/pids/serf.pid
serf_event = /usr/local/kong/serf/serf_event.sh

kong/cmd/utils/serf_signals.lua这个文件负责启动Serf,最关键的启动参数是这些

local args = setmetatable({
    ["-bind"] = kong_config.cluster_listen,
    ["-rpc-addr"] = kong_config.cluster_listen_rpc,
    ["-advertise"] = kong_config.cluster_advertise,
    ["-encrypt"] = kong_config.cluster_encrypt_key,
    ["-log-level"] = "err",
    ["-profile"] = kong_config.cluster_profile,
    ["-node"] = serf.node_name,
    ["-event-handler"] = "member-join,member-leave,member-failed,"
                       .."member-update,member-reap,user:"
                       ..serf_event_name.."="..kong_config.serf_event
  }, Serf.args_mt)

最关键的是-event-handler这个参数,这个参数对Kong节点加入、离开、故障、更新等事件定义了处理脚本kong_config.serf_event,也就是/usr/local/kong/serf/serf_event.sh这个文件,这个文件的内容:

vagrant@precise64:~$ cat /usr/local/kong/serf/serf_event.sh 
#!/bin/sh

PAYLOAD=`cat` # Read from stdin
if [ "$SERF_EVENT" != "user" ]; then
  PAYLOAD="{\"type\":\"${SERF_EVENT}\",\"entity\": \"${PAYLOAD}\"}"
fi

CMD="\
local http = require 'resty.http' \
local client = http.new() \
client:set_timeout(5000) \
client:connect('127.0.0.1', 8001) \
client:request { \
  method = 'POST', \
  path = '/cluster/events/', \
  body = [=[${PAYLOAD}]=], \
  headers = { \
    ['content-type'] = 'application/json' \
  } \
}"

/usr/local/openresty/bin/resty -e "$CMD"

处理方式是POST 127.0.0.1:8001/cluster/events/,在nginx-kong.conf里面定义了/cluster/events/这个URI如何处理:

server {
    server_name kong_admin;
    listen 0.0.0.0:8001;

    location / {
        default_type application/json;
        content_by_lua_block {
            require('lapis').serve('kong.api')
        }
    }
}

终于又回到了kong.api,在kong/api/routes/cluster.lua定义了["/cluster/events/"] 的处理方法:

  ["/cluster/events/"] = { 
    POST = function(self, dao_factory, helpers)
      local message_t = self.params 
             
      -- Trigger event in the node
      singletons.events:publish(message_t.type, message_t)
             
      return responses.send_HTTP_OK()
    end      
  }          

最关键的就是singletons.events:publish(message_t.type, message_t),在Kong的内部发起了一个Event,

function Events:subscribe(event_name, fn)
  if fn then
    self._mediator:subscribe({event_name}, function(message_t)
      fn(message_t)
      return nil, true -- Required to tell mediator to continue processing other events
    end)
  end
end

function Events:publish(event_name, message_t)
  if event_name then
    self._mediator:publish({string.upper(event_name)}, message_t)
  end
end

Kong在启动的时候注册了Event的处理方法:

function Kong.init()
   attach_hooks(events, require "kong.core.hooks")
end

local function attach_hooks(events, hooks)
  for k, v in pairs(hooks) do
    events:subscribe(k, v)
  end
end

处理方法都在kong.core.hooks里,具体处理的事件如下:

return {
  [events.TYPES.ENTITY_UPDATED] = function(message_t)
    invalidate(message_t)
  end,
  [events.TYPES.ENTITY_DELETED] = function(message_t)
    invalidate(message_t)
  end,
  [events.TYPES.ENTITY_CREATED] = function(message_t)
    invalidate(message_t)
  end,
  [events.TYPES.CLUSTER_PROPAGATE] = function(message_t)
    singletons.serf:event(message_t)
  end,
  [events.TYPES["MEMBER-JOIN"]] = function(message_t)
    -- Sometimes multiple nodes are sent at once
    -- 省略具体逻辑
  end,
  [events.TYPES["MEMBER-JOIN"]] = function(message_t)
    -- Sometimes multiple nodes are sent at once
    -- 省略具体逻辑
  end,
  [events.TYPES["MEMBER-LEAVE"]] = function(message_t)
    -- Sometimes multiple nodes are sent at once
    -- 省略具体逻辑
  end,
  [events.TYPES["MEMBER-FAILED"]] = function(message_t)
    -- Sometimes multiple nodes are sent at once
    -- 省略具体逻辑
  end,
  [events.TYPES["MEMBER-UPDATE"]] = function(message_t)
    -- Sometimes multiple nodes are sent at once
    -- 省略具体逻辑
  end,
  [events.TYPES["MEMBER-REAP"]] = function(message_t)
    -- Sometimes multiple nodes are sent at once
    -- 省略具体逻辑
  end
end

实体的更新、删除、创建事件都由invalidate(message_t)处理,来看看具体内容吧:

-- 以下代码无任何删减改!
local function invalidate(message_t)
  if message_t.collection == "consumers" then
    cache.delete(cache.consumer_key(message_t.entity.id))
  elseif message_t.collection == "apis" then
    if message_t.entity then
      cache.delete(cache.api_key(message_t.entity.id))
    end
    cache.delete(cache.all_apis_by_dict_key())
  elseif message_t.collection == "plugins" then
    -- Handles both the update and the delete
    invalidate_plugin(message_t.old_entity and message_t.old_entity or message_t.entity)
  end
end

local function invalidate_plugin(entity)
  cache.delete(cache.plugin_key(entity.name, entity.api_id, entity.consumer_id))
end

总结一下就是清缓存,看来Kong里面核心的数据consumer、api、plugin都是做了缓存的。Serf还能做什么?集群节点的监控,比如某个节点宕机了,我们可以通知域名服务器将故障的节点去掉。

转载于:https://my.oschina.net/chinamerp/blog/850670

 类似资料: