ceph源码分析 --MonitorDB

闻枫
2023-12-01

1.概述

ceph monitor的一个主要功能是使用paxos分布式式协议维护一个key/value数据库的一致性(最主要的就是各个map的一致性,对于monitor而言,即monmap)。12.2.2版本所使用的数据库引擎从原来的leveldb转变为了rocksdb。

之前有一个疑惑,monitor在部署的时候,可以通过ceph.conf文件mon_host的ip获取,那么monmap是否根据静态的ceph.conf文件保持一致呢?
看过paxos算法才知道,答案是否定的,它依赖于monmap文件。(每次map文件变更前, 需要多数monitor节点同意, 确保多数或所有(quorum)节点的map文件一致, 并且map文件是增量更新的, 每次更新产生一个新的version, 所以当mon节点离线再加入时, 需要同步增量部分的信息)

2.MonitorDBStore类

说起维护monitor的数据库一致性,不得不说MonitorDBStore类是如何抽象对k/v数据库的操作的。从MonitorDBStore类的定义出发,一步一步探测它的具体实现:

//每一个op可以看做一个原子性的key/value批量更新
struct Op {
    uint8_t type;
    string prefix;//为了各个模块不相互干扰,每个模块会选择一个前缀    string key, endkey;//prefix+key构成了各个模块独立的key
    bufferlist bl;

这些op将形成事务被encode,decode。

//Transaction类来说明一个事务包含的所有操作(put erase compact)
 struct Transaction;
  typedef ceph::shared_ptr<Transaction> TransactionRef;
  struct Transaction {
    list<Op> ops;
    uint64_t bytes, keys;
    Transaction() : bytes(0), keys(0) {}
    enum {
      OP_PUT    = 1,//设置 key value
      OP_ERASE  = 2,//移除
      OP_COMPACT = 3,//适配??
    };

为了在服务器间的传送,所有的transaction都会经过encode,decode。

    void encode(bufferlist& bl) const {
      ENCODE_START(2, 1, bl);
      ::encode(ops, bl);
      ::encode(bytes, bl);
      ::encode(keys, bl);
      ENCODE_FINISH(bl);
    }

    void decode(bufferlist::iterator& bl) {
      DECODE_START(2, bl);
      ::decode(ops, bl);
      if (struct_v >= 2) {
    ::decode(bytes, bl);
    ::decode(keys, bl);
      }
      DECODE_FINISH(bl);
    }

接下来就是ceph 后端的处理:

int apply_transaction(MonitorDBStore::TransactionRef t) {
    KeyValueDB::Transaction dbt = db->get_transaction();
    ...
    //获取transaction中需要执行的op,同步操作所有compact的 key->value
        list<pair<string, pair<string,string> > > compact;
        //
    for (list<Op>::const_iterator it = t->ops.begin();
     it != t->ops.end();
     ++it) {
      const Op& op = *it;
      switch (op.type) {
      case Transaction::OP_PUT:
    dbt->set(op.prefix, op.key, op.bl);
    break;
      case Transaction::OP_ERASE:
    dbt->rmkey(op.prefix, op.key);
    break;
      case Transaction::OP_COMPACT:
    compact.push_back(make_pair(op.prefix, make_pair(op.key, op.endkey)));
    break;
      default:
    derr << __func__ << " unknown op type " << op.type << dendl;
    ceph_abort();
    break;
      }
    }
    //submit value
    int r = db->submit_transaction_sync(dbt);
    if (r >= 0) {
      while (!compact.empty()) {
    if (compact.front().second.first == string() &&
        compact.front().second.second == string())
      db->compact_prefix_async(compact.front().first);
    else
      db->compact_range_async(compact.front().first, compact.front().second.first, compact.front().second.second);
    compact.pop_front();
      }
    } else {
      assert(0 == "failed to write to db");
    }
    return r;
  }

不同prefix对应的transaction 间使用queue_transaction函数进行异步处理。

  void queue_transaction(MonitorDBStore::TransactionRef t,
             Context *oncommit) {
    io_work.queue(new C_DoTransaction(this, t, oncommit));
  }

而多种prefix对应的transaction的获取通过get_iterator函数,而每种transaction对应的key value则通过get函数。
其他的函数不再此一一解释。

3.MonitorDBstore数据变化

从monitor启动,我们来看一下MonitorDBstore发生的一系列基于key/value的更新。
每次monitor启动时都会按照monmap中的服务器地址去连接其他monitor服务器,并同步数据。这里有两种情况,一种是db中不存在monmap(需要执行mkfs重新产生),另一种是已经添加过的mon节点由于网络或者其他原因异常,恢复正常后的重启(直接从db中读取)。
很明显所有的节点都会从无到有,因此这里的两种情况其实只是单存的区分获取monmap的方式而已。注意一点:monmap很重要,一旦产生,之后的启动不会再重新配置,因此一定要确保配置的正确性。

接下来先讲一下mkfs的大致流程:

if (mkfs) {

        // resolve public_network -> public_addr
    pick_addresses(g_ceph_context, CEPH_PICK_ADDRESS_PUBLIC);

    common_init_finish(g_ceph_context);

    bufferlist monmapbl, osdmapbl;
    std::string error;
    MonMap monmap;
// load or generate monmap
...
          try {
    monmap.decode(monmapbl);
    // always mark seed/mkfs monmap as epoch 0
    monmap.set_epoch(0);
      }
      ...
            ostringstream oss;
       //根据ceph.conf对该节点的monmap进行build init
      int err = monmap.build_initial(g_ceph_context, oss);
     ...
   }

具体的实现过程如下:

int MonMap::build_initial(CephContext *cct, ostream& errout)
{
  const md_config_t *conf = cct->_conf;
  ...
  // -m foo?根据ceph.conf中noname的mon_host生成加入的mon的ip
  if (!conf->mon_host.empty()) {
    int r = build_from_host_list(conf->mon_host, "noname-");
      // What monitors are in the config file?
  std::vector <std::string> sections;
  int ret = conf->get_all_sections(sections);
  if (ret) {
    errout << "Unable to find any monitors in the configuration "
         << "file, because there was an error listing the sections. error "
     << ret << std::endl;
    return -ENOENT;
  }
  std::vector <std::string> mon_names;
  for (std::vector <std::string>::const_iterator s = sections.begin();
       s != sections.end(); ++s) {
    if ((s->substr(0, 4) == "mon.") && (s->size() > 4)) {
      mon_names.push_back(s->substr(4));
    }
  }

  // Find an address for each monitor in the config file.
  for (std::vector <std::string>::const_iterator m = mon_names.begin();
       m != mon_names.end(); ++m) {
    std::vector <std::string> sections;
    std::string m_name("mon");
    m_name += ".";
    m_name += *m;
    sections.push_back(m_name);
    sections.push_back("mon");
    sections.push_back("global");
    std::string val;
    int res = conf->get_val_from_conf_file(sections, "mon addr", val, true);
    if (res) {
      errout << "failed to get an address for mon." << *m << ": error "
       << res << std::endl;
      continue;
    }
    entity_addr_t addr;
    if (!addr.parse(val.c_str())) {
      errout << "unable to parse address for mon." << *m
       << ": addr='" << val << "'" << std::endl;
      continue;
    }
    if (addr.get_port() == 0)
      addr.set_port(CEPH_MON_PORT);

    uint16_t priority = 0;
    if (!conf->get_val_from_conf_file(sections, "mon priority", val, false)) {
      try {
        priority = std::stoul(val);
      } catch (std::logic_error&) {
        errout << "unable to parse priority for mon." << *m
               << ": priority='" << val << "'" << std::endl;
        continue;
      }
    }
    // the make sure this mon isn't already in the map
    if (contains(addr))
      remove(get_name(addr));
    if (contains(*m))
      remove(*m);
    add(mon_info_t{*m, addr, priority});
  }

  if (size() == 0) {
    // no info found from conf options lets try use DNS SRV records
    string srv_name = conf->mon_dns_srv_name;
    string domain;
    // check if domain is also provided and extract it from srv_name
    size_t idx = srv_name.find("_");
    if (idx != string::npos) {
      domain = srv_name.substr(idx + 1);
      srv_name = srv_name.substr(0, idx);
    }

    map<string, DNSResolver::Record> records;
    if (DNSResolver::get_instance()->resolve_srv_hosts(cct, srv_name,
        DNSResolver::SRV_Protocol::TCP, domain, &records) != 0) {

      errout << "unable to get monitor info from DNS SRV with service name: " << 
       "ceph-mon" << std::endl;
    }
    else {
      for (const auto& record : records) {
        add(mon_info_t{record.first,
                       record.second.addr,
                       record.second.priority});
      }
    }
  }

  if (size() == 0) {
    errout << "no monitors specified to connect to." << std::endl;
    return -ENOENT;
  }
  created = ceph_clock_now();
  last_changed = created;
  return 0;
}

就这样新的mon产生了,接下来为了以后直接读取,将该monitor的信息存入到 MonitorDBStore的db中

 MonitorDBStore store(g_conf->mon_data);
    ostringstream oss;
    //默认打开方式rocksdb
    int r = store.create_and_open(oss);
    //创建mon实例
    Monitor mon(g_ceph_context, g_conf->name.get_id(), &store, 0, 0, &monmap);
    新建本地monmap
    MonitorDBStore *store = new MonitorDBStore(g_conf->mon_data);
  {
    ostringstream oss;
    err = store->open(oss);
    if (oss.tellp())
      derr << oss.str() << dendl;
    if (err < 0) {
      derr << "error opening mon data directory at '"
           << g_conf->mon_data << "': " << cpp_strerror(err) << dendl;
      prefork.exit(1);
    }
  }

  bufferlist magicbl;
  err = store->get(Monitor::MONITOR_NAME, "magic", magicbl);
  if (err || !magicbl.length()) {
    derr << "unable to read magic from mon data" << dendl;
    prefork.exit(1);
  }
  string magic(magicbl.c_str(), magicbl.length()-1);  // ignore trailing \n
  if (strcmp(magic.c_str(), CEPH_MON_ONDISK_MAGIC)) {
    derr << "mon fs magic '" << magic << "' != current '" << CEPH_MON_ONDISK_MAGIC << "'" << dendl;
    prefork.exit(1);
  }

  err = Monitor::check_features(store);
  if (err < 0) {
    derr << "error checking features: " << cpp_strerror(err) << dendl;
    prefork.exit(1);
  }

  // inject new monmap?
  if (!inject_monmap.empty()) {
    bufferlist bl;
    std::string error;
    int r = bl.read_file(inject_monmap.c_str(), &error);
    if (r) {
      derr << "unable to read monmap from " << inject_monmap << ": "
       << error << dendl;
      prefork.exit(1);
    }
    // get next version
    version_t v = store->get("monmap", "last_committed");
    dout(0) << "last committed monmap epoch is " << v << ", injected map will be " << (v+1)
            << dendl;
    v++;

    // set the version
    MonMap tmp;
    tmp.decode(bl);
    if (tmp.get_epoch() != v) {
      dout(0) << "changing monmap epoch from " << tmp.get_epoch()
           << " to " << v << dendl;
      tmp.set_epoch(v);
    }
    bufferlist mapbl;
    tmp.encode(mapbl, CEPH_FEATURES_ALL);
    bufferlist final;
    ::encode(v, final);
    ::encode(mapbl, final);

    auto t(std::make_shared<MonitorDBStore::Transaction>());
    // save it
    t->put("monmap", v, mapbl);
    t->put("monmap", "latest", final);
    t->put("monmap", "last_committed", v);
    store->apply_transaction(t);

    dout(0) << "done." << dendl;
    prefork.exit(0);
  }

  // monmap?
  MonMap monmap;
  {
    // note that even if we don't find a viable monmap, we should go ahead
    // and try to build it up in the next if-else block.
    bufferlist mapbl;
    int err = obtain_monmap(*store, mapbl);
    if (err >= 0) {
      try {
        monmap.decode(mapbl);
      } catch (const buffer::error& e) {
        derr << "can't decode monmap: " << e.what() << dendl;
      }
    } else {
      derr << "unable to obtain a monmap: " << cpp_strerror(err) << dendl;
    }
    if (!extract_monmap.empty()) {
      int r = mapbl.write_file(extract_monmap.c_str());
      if (r < 0) {
    r = -errno;
    derr << "error writing monmap to " << extract_monmap << ": " << cpp_strerror(r) << dendl;
    prefork.exit(1);
      }
      derr << "wrote monmap to " << extract_monmap << dendl;
      prefork.exit(0);
    }
  }

//之后就是绑定msg等,此处不赘余。

想必大家已经知晓那并非新加入的monitor节点是从db中直接获取的了。。。哈哈
接下来便是我们非常重要的一节,monitor启动时的数据同步。

4.各个节点的monmap同步。

每次monitor server启动时都会按照monmap中的服务器地址去连接其他monitor服务器,并同步数据。这个过程叫做bootstrap(). bootstrap的第一个目的是补全数据,从其他服务拉缺失的paxos log或者全量复制数据库,其次是在必要时形成多数派建立一个paxos集群或者加入到已有的多数派中。

int Monitor::init()
{
  ..
  bootstrap();//同步数据,建立多数派
  // add features of myself into feature_map
  session_map.feature_map.add_mon(con_self->get_features());
  return 0;
}

看一下bootstrap()的具体实现:

void Monitor::bootstrap()
{
  ..
    // reset
  state = STATE_PROBING;

  _reset();

  // sync store
  if (g_conf->mon_compact_on_bootstrap) {
    dout(10) << "bootstrap -- triggering compaction" << dendl;
    store->compact();
    dout(10) << "bootstrap -- finished compaction" << dendl;
  }

  // singleton monitor?
  if (monmap->size() == 1 && rank == 0) {
    win_standalone_election();
    return;
  }

  reset_probe_timeout();

  // i'm outside the quorum
  if (monmap->contains(name))
    outside_quorum.insert(name);

  // probe monitors
  dout(10) << "probing other monitors" << dendl;
  for (unsigned i = 0; i < monmap->size(); i++) {
    if ((int)i != rank)
      messenger->send_message(new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined),
                  monmap->get_inst(i));
  }
  for (set<entity_addr_t>::iterator p = extra_probe_peers.begin();
       p != extra_probe_peers.end();
       ++p) {
    if (*p != messenger->get_myaddr()) {
      entity_inst_t i;
      i.name = entity_name_t::MON(-1);
      i.addr = *p;
      //给其它所有它知道的mon发送探测包
      messenger->send_message(new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined), i);
    }
  }
}

接下来所有收到OP_PROBE的节点,根据handle_probe中定义的op类型,采用函数handle_probe_probe开始处理了:

void Monitor::handle_probe_probe(MonOpRequestRef op)
{
...
  if (!is_probing() && !is_synchronizing()) {
    // If the probing mon is way ahead of us, we need to re-bootstrap.
    // Normally we capture this case when we initially bootstrap, but
    // it is possible we pass those checks (we overlap with
    // quorum-to-be) but fail to join a quorum before it moves past
    // us.  We need to be kicked back to bootstrap so we can
    // synchonize, not keep calling elections.
    if (paxos->get_version() + 1 < m->paxos_first_version) {
      dout(1) << " peer " << m->get_source_addr() << " has first_committed "
          << "ahead of us, re-bootstrapping" << dendl;
      bootstrap();
      //由于版本号相差较大,无法通过paxos算法部分做数据修复,,本进程需要重启bootstrap从对方主动拉数据。
      goto out;

    }
  }
  //正常流程,汇报paxos的状态,其主要的参量为last_commit,first_commit
  //此处不同于paxos的数据恢复,因为其主要依赖参量last_pn,first_pn,该部分已在paxos做讲解,此处不赘余。
  MMonProbe *r;
  r = new MMonProbe(monmap->fsid, MMonProbe::OP_REPLY, name, has_ever_joined);
  r->name = name;
  r->quorum = quorum;
  monmap->encode(r->monmap_bl, m->get_connection()->get_features());
  r->paxos_first_version = paxos->get_first_committed();
  r->paxos_last_version = paxos->get_version();
  m->get_connection()->send_message(r);
...
 }

发送探测包的monitor节点,在最大超时时间内收到OP_REPLY,会调用handle_probe_reply函数进行下一步处理。

void Monitor::handle_probe_reply(MonOpRequestRef op)
{
  MMonProbe *m = static_cast<MMonProbe*>(op->get_req());
  dout(10) << "handle_probe_reply " << m->get_source_inst() << *m << dendl;
  dout(10) << " monmap is " << *monmap << dendl;
  // discover name and addrs during probing or electing states.
  if (!is_probing() && !is_electing()) {
    return;
  }
  // newer map, or they've joined a quorum and we haven't?
  bufferlist mybl;
  monmap->encode(mybl, m->get_connection()->get_features());
  // make sure it's actually different; the checks below err toward
  // taking the other guy's map, which could cause us to loop.
  if (!mybl.contents_equal(m->monmap_bl)) {
    MonMap *newmap = new MonMap;
    newmap->decode(m->monmap_bl);
    if (m->has_ever_joined && (newmap->get_epoch() > monmap->get_epoch() ||
                   !has_ever_joined)) {
      dout(10) << " got newer/committed monmap epoch " << newmap->get_epoch()
           << ", mine was " << monmap->get_epoch() << dendl;
      delete newmap;
      monmap->decode(m->monmap_bl);

      bootstrap();
      return;
    }
    delete newmap;
  }

  // rename peer?
  string peer_name = monmap->get_name(m->get_source_addr());
  if (monmap->get_epoch() == 0 && peer_name.compare(0, 7, "noname-") == 0) {
    dout(10) << " renaming peer " << m->get_source_addr() << " "
         << peer_name << " -> " << m->name << " in my monmap"
         << dendl;
    monmap->rename(peer_name, m->name);

    if (is_electing()) {
      bootstrap();
      return;
    }
  } else {
    dout(10) << " peer name is " << peer_name << dendl;
  }

  // new initial peer?
  if (monmap->get_epoch() == 0 &&
      monmap->contains(m->name) &&
      monmap->get_addr(m->name).is_blank_ip()) {
    dout(1) << " learned initial mon " << m->name << " addr " << m->get_source_addr() << dendl;
    monmap->set_addr(m->name, m->get_source_addr());

    bootstrap();
    return;
  }

  // end discover phase
  if (!is_probing()) {
    return;
  }

  assert(paxos != NULL);

  if (is_synchronizing()) {
    dout(10) << " currently syncing" << dendl;
    return;
  }

  entity_inst_t other = m->get_source_inst();
  //同步情况1:一种情况是我的最后一条log记录和对方的第一条log记录之间有空隙,中间有缺失,只能主动从对方拉数据。
      if (paxos->get_version() < m->paxos_first_version &&
    m->paxos_first_version > 1) {  // no need to sync if we're 0 and they start at 1.
      dout(10) << " peer paxos first versions [" << m->paxos_first_version
           << "," << m->paxos_last_version << "]"
           << " vs my version " << paxos->get_version()
           << " (too far ahead)"
           << dendl;
      cancel_probe_timeout();
      sync_start(other, true);//开始同步
      return;
    }
    //同步情况2:根据配置变量paxos_max_join_drift,数据并没有缺失,但是要传的log超过一个阀值,不如全量从对方复制数据。

    if (paxos->get_version() + g_conf->paxos_max_join_drift < m->paxos_last_version) {
      dout(10) << " peer paxos last version " << m->paxos_last_version
           << " vs my version " << paxos->get_version()
           << " (too far ahead)"
           << dendl;
      cancel_probe_timeout();
      sync_start(other, false);
      return;
    }
  }

  // is there an existing quorum?
  if (m->quorum.size()) {// 多数派列表非空
    dout(10) << " existing quorum " << m->quorum << dendl;

    dout(10) << " peer paxos version " << m->paxos_last_version
             << " vs my version " << paxos->get_version()
             << " (ok)"
             << dendl;

    if (monmap->contains(name) &&
        !monmap->get_addr(name).is_blank_ip()) {
      // 我的地址他们都知道了, 通过start_election选举后可以加入多数派

      start_election();
    } else {
    //如果对方也不是当前多数派的一员,并且是属于monmap的一员,那么把它
    //列入到在多数派外面的人
    if (monmap->contains(m->name)) {
      dout(10) << " mon." << m->name << " is outside the quorum" << dendl;
      outside_quorum.insert(m->name);
    } else {
      dout(10) << " mostly ignoring mon." << m->name << ", not part of monmap" << dendl;
      m->put();
      return;
    }

    //一旦发现不在多数派的人数超过2F + 1 (包括自己), 说明集群不存在多
    //数派,就可以通过选举来形成多数派
    unsigned need = monmap->size() / 2 + 1;
    dout(10) << " outside_quorum now " << outside_quorum << ", need " << need << dendl;
    if (outside_quorum.size() >= need) {
      if (outside_quorum.count(name)) {
        dout(10) << " that's enough to form a new quorum, calling election" << dendl;
        start_election();
      } else {
        dout(10) << " that's enough to form a new quorum, but it does not include me; waiting" << dendl;
      }
    } else {
      dout(10) << " that's not yet enough for a new quorum, waiting" << dendl;
    }
  }  }
}

接下来就是数据的同步部分了:

void Monitor::sync_start(entity_inst_t &other, bool full)
{
  dout(10) << __func__ << " " << other << (full ? " full" : " recent") << dendl;

  assert(state == STATE_PROBING ||
     state == STATE_SYNCHRONIZING);
  state = STATE_SYNCHRONIZING;

  // make sure are not a provider for anyone!
  sync_reset_provider();

  sync_full = full;

  if (sync_full) {
    // stash key state, and mark that we are syncing
    auto t(std::make_shared<MonitorDBStore::Transaction>());
    sync_stash_critical_state(t);
    t->put("mon_sync", "in_sync", 1);

    sync_last_committed_floor = MAX(sync_last_committed_floor, paxos->get_version());
    dout(10) << __func__ << " marking sync in progress, storing sync_last_committed_floor "
         << sync_last_committed_floor << dendl;
    t->put("mon_sync", "last_committed_floor", sync_last_committed_floor);

    store->apply_transaction(t);//处理事务了。。。

    assert(g_conf->mon_sync_requester_kill_at != 1);

    // clear the underlying store
    set<string> targets = get_sync_targets_names();
    dout(10) << __func__ << " clearing prefixes " << targets << dendl;
    store->clear(targets);

    // make sure paxos knows it has been reset.  this prevents a
    // bootstrap and then different probe reply order from possibly
    // deciding a partial or no sync is needed.
    paxos->init();

    assert(g_conf->mon_sync_requester_kill_at != 2);
  }

  // assume 'other' as the leader. We will update the leader once we receive
  // a reply to the sync start.
  sync_provider = other;

  sync_reset_timeout();

  MMonSync *m = new MMonSync(sync_full ? MMonSync::OP_GET_COOKIE_FULL : MMonSync::OP_GET_COOKIE_RECENT);
  if (!sync_full)
    m->last_committed = paxos->get_version();
  messenger->send_message(m, sync_provider);
}

。。。未完待续
罗列下monmap的各种操作
1.stop ceph-mon id=*
停掉mon-2,不停掉是操作不了monmap的db的
2.导出monmap:
ceph-mon -i ID-FOO –extract-monmap /tmp/monmap
3.查看monmap:
monmaptool –print -f /tmp/monmap
4.删除mon-3
monmaptool –rm mon-3 -f /tmp/monmap
5.注入monmap
ceph-mon -i ID –inject-monmap /tmp/monmap

 类似资料: