MgrStandby类主要是发送心跳消息给mon,同时报告系统元数据,订阅mgr_map消息,根据mgr_map消息中自身是否激活来启动mgr
MgrStandby::init()
int MgrStandby::init()
{
init_async_signal_handler();
register_async_signal_handler(SIGHUP, sighup_handler);
std::lock_guard l(lock);
// Start finisher
finisher.start();
// Initialize Messenger
client_messenger->add_dispatcher_tail(this);
client_messenger->add_dispatcher_head(&objecter);
client_messenger->add_dispatcher_tail(&client);
client_messenger->start();
// Initialize MonClient
if (monc.build_initial_monmap() < 0) {
client_messenger->shutdown();
client_messenger->wait();
return -1;
}
//订阅消息
monc.sub_want("mgrmap", 0, 0);
monc.set_want_keys(CEPH_ENTITY_TYPE_MON|CEPH_ENTITY_TYPE_OSD
|CEPH_ENTITY_TYPE_MDS|CEPH_ENTITY_TYPE_MGR); //订阅mon,osd,mds.mgr
monc.set_messenger(client_messenger.get());
// We must register our config callback before calling init(), so
// that we see the initial configuration message
monc.register_config_callback([this](const std::string &k, const std::string &v){ //注册Mon客户端配置回调
dout(10) << "config_callback: " << k << " : " << v << dendl;
if (k.substr(0, 4) == "mgr/") {
const std::string global_key = PyModule::config_prefix + k.substr(4);
py_module_registry.handle_config(global_key, v); //Python插件注册器处理插件的配置
return true;
}
return false;
});
monc.register_config_notify_callback([this]() { //monc注册配置通知回调
py_module_registry.handle_config_notify();
});
dout(4) << "Registered monc callback" << dendl;
int r = monc.init(); //monc初始化
if (r < 0) {
monc.shutdown();
client_messenger->shutdown();
client_messenger->wait();
return r;
}
mgrc.init(); //mgrclient初始化
client_messenger->add_dispatcher_tail(&mgrc); //
r = monc.authenticate(); //monc获取权限
if (r < 0) {
derr << "Authentication failed, did you specify a mgr ID with a valid keyring?" << dendl;
monc.shutdown();
client_messenger->shutdown();
client_messenger->wait();
return r;
}
// only forward monmap updates after authentication finishes, otherwise
// monc.authenticate() will be waiting for MgrStandy::ms_dispatch()
// to acquire the lock forever, as it is already locked in the beginning of
// this method.
monc.set_passthrough_monmap(); //monc身份验证完成获取锁
client_t whoami = monc.get_global_id(); //得到monc 的ID
client_messenger->set_myname(entity_name_t::MGR(whoami.v));
monc.set_log_client(&log_client); //设置日志客户端
_update_log_config(); //更新日志配置
objecter.set_client_incarnation(0);
objecter.init();
objecter.start();
client.init();
timer.init(); //定时器初始化
py_module_registry.init();
//周期性发送becon message等
tick(); //周期函数
dout(4) << "Complete." << dendl;
return 0;
}
发送心跳消息,解析见注释
void MgrStandby::send_beacon()
{
ceph_assert(lock.is_locked_by_me());
dout(4) << state_str() << dendl;
std::list<PyModuleRef> modules = py_module_registry.get_modules(); //发送心跳
// Construct a list of the info about each loaded module
// which we will transmit to the monitor.
std::vector<MgrMap::ModuleInfo> module_info; //python 模块信息
for (const auto &module : modules) {
MgrMap::ModuleInfo info;
info.name = module->get_name(); //获取插件名称
info.error_string = module->get_error_string(); //错误信息
info.can_run = module->get_can_run(); //
info.module_options = module->get_options();
module_info.push_back(std::move(info));
}
// Whether I think I am available (request MgrMonitor to set me
// as available in the map)
bool available = active_mgr != nullptr && active_mgr->is_initialized(); //检测自身是否可用,active_mgr为mgr
auto addrs = available ? active_mgr->get_server_addrs() : entity_addrvec_t(); //通过messgaer获取服务自身地址
dout(10) << "sending beacon as gid " << monc.get_global_id() << dendl;
map<string,string> metadata;
metadata["addr"] = client_messenger->get_myaddr_legacy().ip_only_to_str();
metadata["addrs"] = stringify(client_messenger->get_myaddrs());
collect_sys_info(&metadata, g_ceph_context); //收集系统信息,存入metadata,有ceph_version,内核信息等
MMgrBeacon *m = new MMgrBeacon(monc.get_fsid(), //获取文件系统ID
monc.get_global_id(), //获取monc全局ID
g_conf()->name.get_id(),
addrs, //地址
available, //可用状态
std::move(module_info), //模块信息
std::move(metadata)); //系统信息
if (available) {
if (!available_in_map) { //init()之后通知mon已经完成初始化
// We are informing the mon that we are done initializing: inform
// it of our command set. This has to happen after init() because
// it needs the python modules to have loaded.
std::vector<MonCommand> commands = mgr_commands;
std::vector<MonCommand> py_commands = py_module_registry.get_commands(); //
commands.insert(commands.end(), py_commands.begin(), py_commands.end());
m->set_command_descs(commands);
dout(4) << "going active, including " << m->get_command_descs().size()
<< " commands in beacon" << dendl;
}
m->set_services(active_mgr->get_services()); //设置服务Python模块的服务列表,名称和URL
}
monc.send_mon_message(m); //发送
}
处理mgr_map消息:
void MgrStandby::handle_mgr_map(MMgrMap* mmap)
{
auto &map = mmap->get_map();
dout(4) << "received map epoch " << map.get_epoch() << dendl;
const bool active_in_map = map.active_gid == monc.get_global_id();
dout(4) << "active in map: " << active_in_map
<< " active is " << map.active_gid << dendl;
// PyModuleRegistry may ask us to respawn if it sees that
// this MgrMap is changing its set of enabled modules
bool need_respawn = py_module_registry.handle_mgr_map(map); //py模块注册器处理map
if (need_respawn) { //是否需要重新加载
respawn();
}
if (active_in_map) { //
if (!active_mgr) {
//第一次active_mgr 为空,这里会新建一个mgr类来处理,从这里可以知道ceph组件更新的信息都是在mgr这个类中处理的。
dout(1) << "Activating!" << dendl;
active_mgr.reset(new Mgr(&monc, map, &py_module_registry,
client_messenger.get(), &objecter,
&client, clog, audit_clog));
active_mgr->background_init(new FunctionContext( //初始化mgr
[this](int r){
// Advertise our active-ness ASAP instead of waiting for
// next tick.
std::lock_guard l(lock);
send_beacon();
}));
dout(1) << "I am now activating" << dendl;
} else {
dout(10) << "I was already active" << dendl;
bool need_respawn = active_mgr->got_mgr_map(map);
if (need_respawn) {
respawn();
}
}
if (!available_in_map && map.get_available()) {
dout(4) << "Map now says I am available" << dendl;
available_in_map = true;
}
} else if (active_mgr != nullptr) {
derr << "I was active but no longer am" << dendl;
respawn();
} else {
if (map.active_gid != 0 && map.active_name != g_conf()->name.get_id()) {
// I am the standby and someone else is active, start modules
// in standby mode to do redirects if needed
if (!py_module_registry.is_standby_running()) {
py_module_registry.standby_start(monc, finisher);
}
}
}
}
MgrStandby处理消息,调用Mgr消息调度函数:
bool MgrStandby::ms_dispatch(Message *m)
{
std::lock_guard l(lock);
dout(4) << state_str() << " " << *m << dendl;
if (m->get_type() == MSG_MGR_MAP) {
handle_mgr_map(static_cast<MMgrMap*>(m));
}
bool handled = false;
if (active_mgr) { //如果mgr 激活
auto am = active_mgr;
lock.Unlock();
handled = am->ms_dispatch(m); //让mgr处理消息
lock.Lock();
}
if (m->get_type() == MSG_MGR_MAP) {
// let this pass through for mgrc
handled = false;
}
return handled;
}