ceph-mgr源码分析------MgrStandby类

鞠修雅
2023-12-01

1.MgrStandby类截取部分代码分析:

MgrStandby类主要是发送心跳消息给mon,同时报告系统元数据,订阅mgr_map消息,根据mgr_map消息中自身是否激活来启动mgr

MgrStandby::init()


int MgrStandby::init()
{
  init_async_signal_handler();
  register_async_signal_handler(SIGHUP, sighup_handler);

  std::lock_guard l(lock);

  // Start finisher
  finisher.start();

  // Initialize Messenger
  client_messenger->add_dispatcher_tail(this);
  client_messenger->add_dispatcher_head(&objecter);
  client_messenger->add_dispatcher_tail(&client);
  client_messenger->start();

  // Initialize MonClient
  if (monc.build_initial_monmap() < 0) {
    client_messenger->shutdown();
    client_messenger->wait();
    return -1;
  }
  //订阅消息
  monc.sub_want("mgrmap", 0, 0);

  monc.set_want_keys(CEPH_ENTITY_TYPE_MON|CEPH_ENTITY_TYPE_OSD
      |CEPH_ENTITY_TYPE_MDS|CEPH_ENTITY_TYPE_MGR);  //订阅mon,osd,mds.mgr
  monc.set_messenger(client_messenger.get());

  // We must register our config callback before calling init(), so
  // that we see the initial configuration message
  monc.register_config_callback([this](const std::string &k, const std::string &v){	//注册Mon客户端配置回调
      dout(10) << "config_callback: " << k << " : " << v << dendl;
      if (k.substr(0, 4) == "mgr/") {	
	const std::string global_key = PyModule::config_prefix + k.substr(4);
        py_module_registry.handle_config(global_key, v);	//Python插件注册器处理插件的配置

	return true;
      }
      return false;
    });
  monc.register_config_notify_callback([this]() { //monc注册配置通知回调
      py_module_registry.handle_config_notify();
    });
  dout(4) << "Registered monc callback" << dendl;

  int r = monc.init();	//monc初始化
  if (r < 0) {
    monc.shutdown();
    client_messenger->shutdown();
    client_messenger->wait();
    return r;
  }
  mgrc.init();	//mgrclient初始化
  client_messenger->add_dispatcher_tail(&mgrc);	//

  r = monc.authenticate();	//monc获取权限
  if (r < 0) {
    derr << "Authentication failed, did you specify a mgr ID with a valid keyring?" << dendl;
    monc.shutdown();
    client_messenger->shutdown();
    client_messenger->wait();
    return r;
  }
  // only forward monmap updates after authentication finishes, otherwise
  // monc.authenticate() will be waiting for MgrStandy::ms_dispatch()
  // to acquire the lock forever, as it is already locked in the beginning of
  // this method.
  monc.set_passthrough_monmap();	//monc身份验证完成获取锁

  client_t whoami = monc.get_global_id();	//得到monc 的ID
  client_messenger->set_myname(entity_name_t::MGR(whoami.v));
  monc.set_log_client(&log_client);	//设置日志客户端
  _update_log_config();		//更新日志配置
  objecter.set_client_incarnation(0);  
  objecter.init();
  objecter.start();
  client.init();
  timer.init();		//定时器初始化

  py_module_registry.init();
  //周期性发送becon message等
  tick();	//周期函数

  dout(4) << "Complete." << dendl;
  return 0;
}

发送心跳消息,解析见注释

void MgrStandby::send_beacon()
{
  ceph_assert(lock.is_locked_by_me());
  dout(4) << state_str() << dendl;

  std::list<PyModuleRef> modules = py_module_registry.get_modules();  //发送心跳

  // Construct a list of the info about each loaded module
  // which we will transmit to the monitor.
  std::vector<MgrMap::ModuleInfo> module_info;  //python 模块信息
  for (const auto &module : modules) {
    MgrMap::ModuleInfo info;
    info.name = module->get_name();  //获取插件名称
    info.error_string = module->get_error_string(); //错误信息
    info.can_run = module->get_can_run();   //
    info.module_options = module->get_options();
    module_info.push_back(std::move(info));
  }

  // Whether I think I am available (request MgrMonitor to set me
  // as available in the map)
  bool available = active_mgr != nullptr && active_mgr->is_initialized();  //检测自身是否可用,active_mgr为mgr

  auto addrs = available ? active_mgr->get_server_addrs() : entity_addrvec_t(); //通过messgaer获取服务自身地址
  dout(10) << "sending beacon as gid " << monc.get_global_id() << dendl;

  map<string,string> metadata;  
  metadata["addr"] = client_messenger->get_myaddr_legacy().ip_only_to_str();
  metadata["addrs"] = stringify(client_messenger->get_myaddrs());
  collect_sys_info(&metadata, g_ceph_context);    //收集系统信息,存入metadata,有ceph_version,内核信息等

  MMgrBeacon *m = new MMgrBeacon(monc.get_fsid(),   //获取文件系统ID
				 monc.get_global_id(),                //获取monc全局ID
                                 g_conf()->name.get_id(),  
                                 addrs,   //地址
                                 available,   //可用状态
				 std::move(module_info),    //模块信息
				 std::move(metadata));      //系统信息

  if (available) {  
    if (!available_in_map) { //init()之后通知mon已经完成初始化
      // We are informing the mon that we are done initializing: inform
      // it of our command set.  This has to happen after init() because
      // it needs the python modules to have loaded.
      std::vector<MonCommand> commands = mgr_commands;
      std::vector<MonCommand> py_commands = py_module_registry.get_commands();  //
      commands.insert(commands.end(), py_commands.begin(), py_commands.end());
      m->set_command_descs(commands);
      dout(4) << "going active, including " << m->get_command_descs().size()
              << " commands in beacon" << dendl;
    }

    m->set_services(active_mgr->get_services());  //设置服务Python模块的服务列表,名称和URL
  }
                                 
  monc.send_mon_message(m); //发送
}

处理mgr_map消息:


void MgrStandby::handle_mgr_map(MMgrMap* mmap)
{
  auto &map = mmap->get_map();
  dout(4) << "received map epoch " << map.get_epoch() << dendl; 
  const bool active_in_map = map.active_gid == monc.get_global_id();
  dout(4) << "active in map: " << active_in_map
          << " active is " << map.active_gid << dendl;

  // PyModuleRegistry may ask us to respawn if it sees that
  // this MgrMap is changing its set of enabled modules
  bool need_respawn = py_module_registry.handle_mgr_map(map); //py模块注册器处理map
  if (need_respawn) {   //是否需要重新加载
    respawn();
  }

  if (active_in_map) {  //
    if (!active_mgr) {
      //第一次active_mgr 为空,这里会新建一个mgr类来处理,从这里可以知道ceph组件更新的信息都是在mgr这个类中处理的。
      dout(1) << "Activating!" << dendl;
      active_mgr.reset(new Mgr(&monc, map, &py_module_registry,
                               client_messenger.get(), &objecter,
			       &client, clog, audit_clog));
      active_mgr->background_init(new FunctionContext( //初始化mgr
            [this](int r){
              // Advertise our active-ness ASAP instead of waiting for
              // next tick.
              std::lock_guard l(lock);
              send_beacon();
            }));
      dout(1) << "I am now activating" << dendl;
    } else {
      dout(10) << "I was already active" << dendl;
      bool need_respawn = active_mgr->got_mgr_map(map);
      if (need_respawn) {
	respawn();
      }
    }

    if (!available_in_map && map.get_available()) {
      dout(4) << "Map now says I am available" << dendl;
      available_in_map = true;
    }
  } else if (active_mgr != nullptr) {
    derr << "I was active but no longer am" << dendl;
    respawn();
  } else {
    if (map.active_gid != 0 && map.active_name != g_conf()->name.get_id()) {
      // I am the standby and someone else is active, start modules
      // in standby mode to do redirects if needed
      if (!py_module_registry.is_standby_running()) {
        py_module_registry.standby_start(monc, finisher);
      }
    }
  }
}

MgrStandby处理消息,调用Mgr消息调度函数:

bool MgrStandby::ms_dispatch(Message *m)
{
  std::lock_guard l(lock);
  dout(4) << state_str() << " " << *m << dendl;

  if (m->get_type() == MSG_MGR_MAP) {
    handle_mgr_map(static_cast<MMgrMap*>(m));
  }
  bool handled = false;
  if (active_mgr) {   //如果mgr 激活
    auto am = active_mgr;
    lock.Unlock();
    handled = am->ms_dispatch(m); //让mgr处理消息
    lock.Lock();
  }
  if (m->get_type() == MSG_MGR_MAP) {
    // let this pass through for mgrc
    handled = false;
  }
  return handled;
}
 类似资料: