zookeeper源码解析-admin服务

米子轩
2023-12-01

zk版本:3.5.6

1.引入

单机启动时会创建admin对象

  adminServer = AdminServerFactory.createAdminServer();
  // 设置zookeeper服务
  adminServer.setZooKeeperServer(zkServer);
  // 服务启动,监听客户端请求
  adminServer.start();

通过AdminServerFactory.createAdminServer()可知,最终会创建JettyAdminServer对象,并通过这个对象提供zk的admin服务。所以来看看这个类到底是如何运行的。

2.JettyAdminServer请求和响应

    public JettyAdminServer(String address, int port, int timeout, String commandUrl) {
        ..... // 设置jetty连接

        // jetty admin的处理逻辑是通过CommandServlet实现的,请求地址(/commands/*)
        context.addServlet(new ServletHolder(new CommandServlet()), commandUrl + "/*");
    }

    /**
     * 处理jetty admin的请求
     */
   private class CommandServlet extends HttpServlet {
        private static final long serialVersionUID = 1L;

        protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
            // 请求命令就是通过指定路径实现的,比如commands/config,cmd=config
            String cmd = request.getPathInfo();
            // 获取所有的请求路径
            if (cmd == null || cmd.equals("/")) {
                // No command specified, print links to all commands instead
                for (String link : commandLinks()) {
                    response.getWriter().println(link);
                    response.getWriter().println("<br/>");
                }
                return;
            }
            cmd = cmd.substring(1);

            // 请求参数
            Map<String, String[]> parameterMap = request.getParameterMap();
            Map<String, String> kwargs = new HashMap<String, String>();
            for (Map.Entry<String, String[]> entry : parameterMap.entrySet()) {
                kwargs.put(entry.getKey(), entry.getValue()[0]);
            }

            // 请求参数作为运行命令的参数
            CommandResponse cmdResponse = Commands.runCommand(cmd, zkServer, kwargs);

            // Format and print the output of the command

            // json格式输出
            CommandOutputter outputter = new JsonOutputter();
            response.setStatus(HttpServletResponse.SC_OK);
            response.setContentType(outputter.getContentType());

            // 响应指定格式的输出结果
            outputter.output(cmdResponse, response.getWriter());
        }
    }

admin服务创建和处理流程:

  1. 创建jetty server
  2. 监听请求路径commands/*,设置admin请求处理器CommandServlet
  3. 根据请求路径和请求参数,执行对象的命令(Commands保存所有的命令执行逻辑)
  4. 响应指定格式的输出结果,这里指定json格式输出

3.admin命令执行

在Commands类中保存所有的命令的定义和执行,jettyAdminServer中使用到了Commands.runCommand,我们直接从这个方法介绍:

Commands.java
----------------

 public static CommandResponse runCommand(String cmdName, ZooKeeperServer zkServer, Map<String, String> kwargs) {
     
        ....
        // 运行命令
        return commands.get(cmdName).run(zkServer, kwargs);
    }

commands在Commands类静态初始化的时候就会添加所有可以使用的命令集合。这里我们以其中一个admin请求说明问题:

http://localhost:8080/commands/configuration

此时: cmdName=configuration,通过反推很容易得到commands.get(cmdName)对应的就是org.apache.zookeeper.server.admin.Commands.ConfCommand对象。
所以看看这个类是如果处理的:

Commands.java
--------------

    public static class ConfCommand extends CommandBase {
        public ConfCommand() {
              // 通过CommandBase构造器可知,CommandBase=configuration,names=conf,config super(Arrays.asList("configuration", "conf", "config"));
        }

        @Override
        public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
           // 初始化响应对象
            CommandResponse response = initializeResponse();
            // zkServer的配置 response.putAll(zkServer.getConf().toMap());
            return response;
        }
    }

可以看出:commands/configure请求最终获取到的是zkServer的配置信息。

最终响应的结果示例:

{
  "client_port" : 2181,
  "data_dir" : "D:\\tmp\\zookeeper\\version-2",
  "data_log_dir" : "D:\\tmp\\zookeeper\\version-2",
  "tick_time" : 2000,
  "max_client_cnxns" : 60,
  "min_session_timeout" : 4000,
  "max_session_timeout" : 40000,
  "server_id" : 0,
  "command" : "configuration",
  "error" : null
}

4.所有命令示例

commands/connection_stat_reset 
{
  "command" : "connection_stat_reset",
  "error" : null
}

commands/connections
{
  "connections" : [ ],
  "secure_connections" : [ ],
  "command" : "connections",
  "error" : null
}

commands/dirs
{
  "datadir_size" : 67109730,
  "logdir_size" : 67109730,
  "command" : "dirs",
  "error" : null
}

commands/dump
{
  "expiry_time_to_session_ids" : { },
  "session_id_to_ephemeral_paths" : { },
  "command" : "dump",
  "error" : null
}

commands/get_trace_mask
{
  "tracemask" : 306,
  "command" : "get_trace_mask",
  "error" : null
}

commands/is_read_only
{
  "read_only" : false,
  "command" : "is_read_only",
  "error" : null
}

commands/monitor
{
  "version" : "3.5.6-1, built on 2019-11-11",
  "avg_latency" : 0,
  "max_latency" : 0,
  "min_latency" : 0,
  "packets_received" : 0,
  "packets_sent" : 0,
  "num_alive_connections" : 0,
  "outstanding_requests" : 0,
  "server_state" : "standalone",
  "znode_count" : 5,
  "watch_count" : 0,
  "ephemerals_count" : 0,
  "approximate_data_size" : 44,
  "open_file_descriptor_count" : -1,
  "max_file_descriptor_count" : -1,
  "last_client_response_size" : -1,
  "max_client_response_size" : -1,
  "min_client_response_size" : -1,
  "command" : "monitor",
  "error" : null
}

commands/ruok 
{
  "command" : "ruok",
  "error" : null
}

commands/server_stats
{
  "version" : "3.5.6-1, built on 2019-11-11",
  "read_only" : false,
  "server_stats" : {
    "packets_sent" : 0,
    "packets_received" : 0,
    "max_latency" : 0,
    "min_latency" : 0,
    "fsync_threshold_exceed_count" : 0,
    "client_response_stats" : {
      "last_buffer_size" : -1,
      "min_buffer_size" : -1,
      "max_buffer_size" : -1
    },
    "provider_null" : false,
    "avg_latency" : 0,
    "server_state" : "standalone",
    "log_dir_size" : 67109730,
    "data_dir_size" : 67109730,
    "num_alive_client_connections" : 0,
    "last_processed_zxid" : 8,
    "outstanding_requests" : 0
  },
  "client_response" : {
    "last_buffer_size" : -1,
    "min_buffer_size" : -1,
    "max_buffer_size" : -1
  },
  "node_count" : 5,
  "command" : "server_stats",
  "error" : null
}

commands/set_trace_mask
{
  "error" : "setTraceMask requires long traceMask argument",
  "command" : "set_trace_mask"
}

commands/stat_reset
{
  "command" : "stat_reset",
  "error" : null
}

commands/stats
{
  "version" : "3.5.6-1, built on 2019-11-11",
  "read_only" : false,
  "server_stats" : {
    "packets_sent" : 0,
    "packets_received" : 0,
    "max_latency" : 0,
    "min_latency" : 0,
    "fsync_threshold_exceed_count" : 0,
    "client_response_stats" : {
      "last_buffer_size" : -1,
      "min_buffer_size" : -1,
      "max_buffer_size" : -1
    },
    "provider_null" : false,
    "avg_latency" : 0,
    "server_state" : "standalone",
    "log_dir_size" : 67109730,
    "data_dir_size" : 67109730,
    "num_alive_client_connections" : 0,
    "last_processed_zxid" : 8,
    "outstanding_requests" : 0
  },
  "client_response" : {
    "last_buffer_size" : -1,
    "min_buffer_size" : -1,
    "max_buffer_size" : -1
  },
  "node_count" : 5,
  "connections" : [ ],
  "command" : "stats",
  "error" : null
}

commands/watch_summary
{
  "num_connections" : 0,
  "num_paths" : 0,
  "num_total_watches" : 0,
  "command" : "watch_summary",
  "error" : null
}

commands/watches
{
  "session_id_to_watched_paths" : { },
  "command" : "watches",
  "error" : null
}

commands/watches_by_path
{
  "path_to_session_ids" : { },
  "command" : "watches_by_path",
  "error" : null
}
 类似资料: