ceph monitor的一个主要功能是使用paxos分布式式协议维护一个key/value数据库的一致性(最主要的就是各个map的一致性,对于monitor而言,即monmap)。12.2.2版本所使用的数据库引擎从原来的leveldb转变为了rocksdb。
之前有一个疑惑,monitor在部署的时候,可以通过ceph.conf文件mon_host的ip获取,那么monmap是否根据静态的ceph.conf文件保持一致呢?
看过paxos算法才知道,答案是否定的,它依赖于monmap文件。(每次map文件变更前, 需要多数monitor节点同意, 确保多数或所有(quorum)节点的map文件一致, 并且map文件是增量更新的, 每次更新产生一个新的version, 所以当mon节点离线再加入时, 需要同步增量部分的信息)
说起维护monitor的数据库一致性,不得不说MonitorDBStore类是如何抽象对k/v数据库的操作的。从MonitorDBStore类的定义出发,一步一步探测它的具体实现:
//每一个op可以看做一个原子性的key/value批量更新
struct Op {
uint8_t type;
string prefix;//为了各个模块不相互干扰,每个模块会选择一个前缀 string key, endkey;//prefix+key构成了各个模块独立的key
bufferlist bl;
这些op将形成事务被encode,decode。
//Transaction类来说明一个事务包含的所有操作(put erase compact)
struct Transaction;
typedef ceph::shared_ptr<Transaction> TransactionRef;
struct Transaction {
list<Op> ops;
uint64_t bytes, keys;
Transaction() : bytes(0), keys(0) {}
enum {
OP_PUT = 1,//设置 key value
OP_ERASE = 2,//移除
OP_COMPACT = 3,//适配??
};
为了在服务器间的传送,所有的transaction都会经过encode,decode。
void encode(bufferlist& bl) const {
ENCODE_START(2, 1, bl);
::encode(ops, bl);
::encode(bytes, bl);
::encode(keys, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::iterator& bl) {
DECODE_START(2, bl);
::decode(ops, bl);
if (struct_v >= 2) {
::decode(bytes, bl);
::decode(keys, bl);
}
DECODE_FINISH(bl);
}
接下来就是ceph 后端的处理:
int apply_transaction(MonitorDBStore::TransactionRef t) {
KeyValueDB::Transaction dbt = db->get_transaction();
...
//获取transaction中需要执行的op,同步操作所有compact的 key->value
list<pair<string, pair<string,string> > > compact;
//
for (list<Op>::const_iterator it = t->ops.begin();
it != t->ops.end();
++it) {
const Op& op = *it;
switch (op.type) {
case Transaction::OP_PUT:
dbt->set(op.prefix, op.key, op.bl);
break;
case Transaction::OP_ERASE:
dbt->rmkey(op.prefix, op.key);
break;
case Transaction::OP_COMPACT:
compact.push_back(make_pair(op.prefix, make_pair(op.key, op.endkey)));
break;
default:
derr << __func__ << " unknown op type " << op.type << dendl;
ceph_abort();
break;
}
}
//submit value
int r = db->submit_transaction_sync(dbt);
if (r >= 0) {
while (!compact.empty()) {
if (compact.front().second.first == string() &&
compact.front().second.second == string())
db->compact_prefix_async(compact.front().first);
else
db->compact_range_async(compact.front().first, compact.front().second.first, compact.front().second.second);
compact.pop_front();
}
} else {
assert(0 == "failed to write to db");
}
return r;
}
不同prefix对应的transaction 间使用queue_transaction函数进行异步处理。
void queue_transaction(MonitorDBStore::TransactionRef t,
Context *oncommit) {
io_work.queue(new C_DoTransaction(this, t, oncommit));
}
而多种prefix对应的transaction的获取通过get_iterator函数,而每种transaction对应的key value则通过get函数。
其他的函数不再此一一解释。
从monitor启动,我们来看一下MonitorDBstore发生的一系列基于key/value的更新。
每次monitor启动时都会按照monmap中的服务器地址去连接其他monitor服务器,并同步数据。这里有两种情况,一种是db中不存在monmap(需要执行mkfs重新产生),另一种是已经添加过的mon节点由于网络或者其他原因异常,恢复正常后的重启(直接从db中读取)。
很明显所有的节点都会从无到有,因此这里的两种情况其实只是单存的区分获取monmap的方式而已。注意一点:monmap很重要,一旦产生,之后的启动不会再重新配置,因此一定要确保配置的正确性。
接下来先讲一下mkfs的大致流程:
if (mkfs) {
// resolve public_network -> public_addr
pick_addresses(g_ceph_context, CEPH_PICK_ADDRESS_PUBLIC);
common_init_finish(g_ceph_context);
bufferlist monmapbl, osdmapbl;
std::string error;
MonMap monmap;
// load or generate monmap
...
try {
monmap.decode(monmapbl);
// always mark seed/mkfs monmap as epoch 0
monmap.set_epoch(0);
}
...
ostringstream oss;
//根据ceph.conf对该节点的monmap进行build init
int err = monmap.build_initial(g_ceph_context, oss);
...
}
具体的实现过程如下:
int MonMap::build_initial(CephContext *cct, ostream& errout)
{
const md_config_t *conf = cct->_conf;
...
// -m foo?根据ceph.conf中noname的mon_host生成加入的mon的ip
if (!conf->mon_host.empty()) {
int r = build_from_host_list(conf->mon_host, "noname-");
// What monitors are in the config file?
std::vector <std::string> sections;
int ret = conf->get_all_sections(sections);
if (ret) {
errout << "Unable to find any monitors in the configuration "
<< "file, because there was an error listing the sections. error "
<< ret << std::endl;
return -ENOENT;
}
std::vector <std::string> mon_names;
for (std::vector <std::string>::const_iterator s = sections.begin();
s != sections.end(); ++s) {
if ((s->substr(0, 4) == "mon.") && (s->size() > 4)) {
mon_names.push_back(s->substr(4));
}
}
// Find an address for each monitor in the config file.
for (std::vector <std::string>::const_iterator m = mon_names.begin();
m != mon_names.end(); ++m) {
std::vector <std::string> sections;
std::string m_name("mon");
m_name += ".";
m_name += *m;
sections.push_back(m_name);
sections.push_back("mon");
sections.push_back("global");
std::string val;
int res = conf->get_val_from_conf_file(sections, "mon addr", val, true);
if (res) {
errout << "failed to get an address for mon." << *m << ": error "
<< res << std::endl;
continue;
}
entity_addr_t addr;
if (!addr.parse(val.c_str())) {
errout << "unable to parse address for mon." << *m
<< ": addr='" << val << "'" << std::endl;
continue;
}
if (addr.get_port() == 0)
addr.set_port(CEPH_MON_PORT);
uint16_t priority = 0;
if (!conf->get_val_from_conf_file(sections, "mon priority", val, false)) {
try {
priority = std::stoul(val);
} catch (std::logic_error&) {
errout << "unable to parse priority for mon." << *m
<< ": priority='" << val << "'" << std::endl;
continue;
}
}
// the make sure this mon isn't already in the map
if (contains(addr))
remove(get_name(addr));
if (contains(*m))
remove(*m);
add(mon_info_t{*m, addr, priority});
}
if (size() == 0) {
// no info found from conf options lets try use DNS SRV records
string srv_name = conf->mon_dns_srv_name;
string domain;
// check if domain is also provided and extract it from srv_name
size_t idx = srv_name.find("_");
if (idx != string::npos) {
domain = srv_name.substr(idx + 1);
srv_name = srv_name.substr(0, idx);
}
map<string, DNSResolver::Record> records;
if (DNSResolver::get_instance()->resolve_srv_hosts(cct, srv_name,
DNSResolver::SRV_Protocol::TCP, domain, &records) != 0) {
errout << "unable to get monitor info from DNS SRV with service name: " <<
"ceph-mon" << std::endl;
}
else {
for (const auto& record : records) {
add(mon_info_t{record.first,
record.second.addr,
record.second.priority});
}
}
}
if (size() == 0) {
errout << "no monitors specified to connect to." << std::endl;
return -ENOENT;
}
created = ceph_clock_now();
last_changed = created;
return 0;
}
就这样新的mon产生了,接下来为了以后直接读取,将该monitor的信息存入到 MonitorDBStore的db中
MonitorDBStore store(g_conf->mon_data);
ostringstream oss;
//默认打开方式rocksdb
int r = store.create_and_open(oss);
//创建mon实例
Monitor mon(g_ceph_context, g_conf->name.get_id(), &store, 0, 0, &monmap);
新建本地monmap
MonitorDBStore *store = new MonitorDBStore(g_conf->mon_data);
{
ostringstream oss;
err = store->open(oss);
if (oss.tellp())
derr << oss.str() << dendl;
if (err < 0) {
derr << "error opening mon data directory at '"
<< g_conf->mon_data << "': " << cpp_strerror(err) << dendl;
prefork.exit(1);
}
}
bufferlist magicbl;
err = store->get(Monitor::MONITOR_NAME, "magic", magicbl);
if (err || !magicbl.length()) {
derr << "unable to read magic from mon data" << dendl;
prefork.exit(1);
}
string magic(magicbl.c_str(), magicbl.length()-1); // ignore trailing \n
if (strcmp(magic.c_str(), CEPH_MON_ONDISK_MAGIC)) {
derr << "mon fs magic '" << magic << "' != current '" << CEPH_MON_ONDISK_MAGIC << "'" << dendl;
prefork.exit(1);
}
err = Monitor::check_features(store);
if (err < 0) {
derr << "error checking features: " << cpp_strerror(err) << dendl;
prefork.exit(1);
}
// inject new monmap?
if (!inject_monmap.empty()) {
bufferlist bl;
std::string error;
int r = bl.read_file(inject_monmap.c_str(), &error);
if (r) {
derr << "unable to read monmap from " << inject_monmap << ": "
<< error << dendl;
prefork.exit(1);
}
// get next version
version_t v = store->get("monmap", "last_committed");
dout(0) << "last committed monmap epoch is " << v << ", injected map will be " << (v+1)
<< dendl;
v++;
// set the version
MonMap tmp;
tmp.decode(bl);
if (tmp.get_epoch() != v) {
dout(0) << "changing monmap epoch from " << tmp.get_epoch()
<< " to " << v << dendl;
tmp.set_epoch(v);
}
bufferlist mapbl;
tmp.encode(mapbl, CEPH_FEATURES_ALL);
bufferlist final;
::encode(v, final);
::encode(mapbl, final);
auto t(std::make_shared<MonitorDBStore::Transaction>());
// save it
t->put("monmap", v, mapbl);
t->put("monmap", "latest", final);
t->put("monmap", "last_committed", v);
store->apply_transaction(t);
dout(0) << "done." << dendl;
prefork.exit(0);
}
// monmap?
MonMap monmap;
{
// note that even if we don't find a viable monmap, we should go ahead
// and try to build it up in the next if-else block.
bufferlist mapbl;
int err = obtain_monmap(*store, mapbl);
if (err >= 0) {
try {
monmap.decode(mapbl);
} catch (const buffer::error& e) {
derr << "can't decode monmap: " << e.what() << dendl;
}
} else {
derr << "unable to obtain a monmap: " << cpp_strerror(err) << dendl;
}
if (!extract_monmap.empty()) {
int r = mapbl.write_file(extract_monmap.c_str());
if (r < 0) {
r = -errno;
derr << "error writing monmap to " << extract_monmap << ": " << cpp_strerror(r) << dendl;
prefork.exit(1);
}
derr << "wrote monmap to " << extract_monmap << dendl;
prefork.exit(0);
}
}
//之后就是绑定msg等,此处不赘余。
想必大家已经知晓那并非新加入的monitor节点是从db中直接获取的了。。。哈哈
接下来便是我们非常重要的一节,monitor启动时的数据同步。
每次monitor server启动时都会按照monmap中的服务器地址去连接其他monitor服务器,并同步数据。这个过程叫做bootstrap(). bootstrap的第一个目的是补全数据,从其他服务拉缺失的paxos log或者全量复制数据库,其次是在必要时形成多数派建立一个paxos集群或者加入到已有的多数派中。
int Monitor::init()
{
..
bootstrap();//同步数据,建立多数派
// add features of myself into feature_map
session_map.feature_map.add_mon(con_self->get_features());
return 0;
}
看一下bootstrap()的具体实现:
void Monitor::bootstrap()
{
..
// reset
state = STATE_PROBING;
_reset();
// sync store
if (g_conf->mon_compact_on_bootstrap) {
dout(10) << "bootstrap -- triggering compaction" << dendl;
store->compact();
dout(10) << "bootstrap -- finished compaction" << dendl;
}
// singleton monitor?
if (monmap->size() == 1 && rank == 0) {
win_standalone_election();
return;
}
reset_probe_timeout();
// i'm outside the quorum
if (monmap->contains(name))
outside_quorum.insert(name);
// probe monitors
dout(10) << "probing other monitors" << dendl;
for (unsigned i = 0; i < monmap->size(); i++) {
if ((int)i != rank)
messenger->send_message(new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined),
monmap->get_inst(i));
}
for (set<entity_addr_t>::iterator p = extra_probe_peers.begin();
p != extra_probe_peers.end();
++p) {
if (*p != messenger->get_myaddr()) {
entity_inst_t i;
i.name = entity_name_t::MON(-1);
i.addr = *p;
//给其它所有它知道的mon发送探测包
messenger->send_message(new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined), i);
}
}
}
接下来所有收到OP_PROBE的节点,根据handle_probe中定义的op类型,采用函数handle_probe_probe开始处理了:
void Monitor::handle_probe_probe(MonOpRequestRef op)
{
...
if (!is_probing() && !is_synchronizing()) {
// If the probing mon is way ahead of us, we need to re-bootstrap.
// Normally we capture this case when we initially bootstrap, but
// it is possible we pass those checks (we overlap with
// quorum-to-be) but fail to join a quorum before it moves past
// us. We need to be kicked back to bootstrap so we can
// synchonize, not keep calling elections.
if (paxos->get_version() + 1 < m->paxos_first_version) {
dout(1) << " peer " << m->get_source_addr() << " has first_committed "
<< "ahead of us, re-bootstrapping" << dendl;
bootstrap();
//由于版本号相差较大,无法通过paxos算法部分做数据修复,,本进程需要重启bootstrap从对方主动拉数据。
goto out;
}
}
//正常流程,汇报paxos的状态,其主要的参量为last_commit,first_commit
//此处不同于paxos的数据恢复,因为其主要依赖参量last_pn,first_pn,该部分已在paxos做讲解,此处不赘余。
MMonProbe *r;
r = new MMonProbe(monmap->fsid, MMonProbe::OP_REPLY, name, has_ever_joined);
r->name = name;
r->quorum = quorum;
monmap->encode(r->monmap_bl, m->get_connection()->get_features());
r->paxos_first_version = paxos->get_first_committed();
r->paxos_last_version = paxos->get_version();
m->get_connection()->send_message(r);
...
}
发送探测包的monitor节点,在最大超时时间内收到OP_REPLY,会调用handle_probe_reply函数进行下一步处理。
void Monitor::handle_probe_reply(MonOpRequestRef op)
{
MMonProbe *m = static_cast<MMonProbe*>(op->get_req());
dout(10) << "handle_probe_reply " << m->get_source_inst() << *m << dendl;
dout(10) << " monmap is " << *monmap << dendl;
// discover name and addrs during probing or electing states.
if (!is_probing() && !is_electing()) {
return;
}
// newer map, or they've joined a quorum and we haven't?
bufferlist mybl;
monmap->encode(mybl, m->get_connection()->get_features());
// make sure it's actually different; the checks below err toward
// taking the other guy's map, which could cause us to loop.
if (!mybl.contents_equal(m->monmap_bl)) {
MonMap *newmap = new MonMap;
newmap->decode(m->monmap_bl);
if (m->has_ever_joined && (newmap->get_epoch() > monmap->get_epoch() ||
!has_ever_joined)) {
dout(10) << " got newer/committed monmap epoch " << newmap->get_epoch()
<< ", mine was " << monmap->get_epoch() << dendl;
delete newmap;
monmap->decode(m->monmap_bl);
bootstrap();
return;
}
delete newmap;
}
// rename peer?
string peer_name = monmap->get_name(m->get_source_addr());
if (monmap->get_epoch() == 0 && peer_name.compare(0, 7, "noname-") == 0) {
dout(10) << " renaming peer " << m->get_source_addr() << " "
<< peer_name << " -> " << m->name << " in my monmap"
<< dendl;
monmap->rename(peer_name, m->name);
if (is_electing()) {
bootstrap();
return;
}
} else {
dout(10) << " peer name is " << peer_name << dendl;
}
// new initial peer?
if (monmap->get_epoch() == 0 &&
monmap->contains(m->name) &&
monmap->get_addr(m->name).is_blank_ip()) {
dout(1) << " learned initial mon " << m->name << " addr " << m->get_source_addr() << dendl;
monmap->set_addr(m->name, m->get_source_addr());
bootstrap();
return;
}
// end discover phase
if (!is_probing()) {
return;
}
assert(paxos != NULL);
if (is_synchronizing()) {
dout(10) << " currently syncing" << dendl;
return;
}
entity_inst_t other = m->get_source_inst();
//同步情况1:一种情况是我的最后一条log记录和对方的第一条log记录之间有空隙,中间有缺失,只能主动从对方拉数据。
if (paxos->get_version() < m->paxos_first_version &&
m->paxos_first_version > 1) { // no need to sync if we're 0 and they start at 1.
dout(10) << " peer paxos first versions [" << m->paxos_first_version
<< "," << m->paxos_last_version << "]"
<< " vs my version " << paxos->get_version()
<< " (too far ahead)"
<< dendl;
cancel_probe_timeout();
sync_start(other, true);//开始同步
return;
}
//同步情况2:根据配置变量paxos_max_join_drift,数据并没有缺失,但是要传的log超过一个阀值,不如全量从对方复制数据。
if (paxos->get_version() + g_conf->paxos_max_join_drift < m->paxos_last_version) {
dout(10) << " peer paxos last version " << m->paxos_last_version
<< " vs my version " << paxos->get_version()
<< " (too far ahead)"
<< dendl;
cancel_probe_timeout();
sync_start(other, false);
return;
}
}
// is there an existing quorum?
if (m->quorum.size()) {// 多数派列表非空
dout(10) << " existing quorum " << m->quorum << dendl;
dout(10) << " peer paxos version " << m->paxos_last_version
<< " vs my version " << paxos->get_version()
<< " (ok)"
<< dendl;
if (monmap->contains(name) &&
!monmap->get_addr(name).is_blank_ip()) {
// 我的地址他们都知道了, 通过start_election选举后可以加入多数派
start_election();
} else {
//如果对方也不是当前多数派的一员,并且是属于monmap的一员,那么把它
//列入到在多数派外面的人
if (monmap->contains(m->name)) {
dout(10) << " mon." << m->name << " is outside the quorum" << dendl;
outside_quorum.insert(m->name);
} else {
dout(10) << " mostly ignoring mon." << m->name << ", not part of monmap" << dendl;
m->put();
return;
}
//一旦发现不在多数派的人数超过2F + 1 (包括自己), 说明集群不存在多
//数派,就可以通过选举来形成多数派
unsigned need = monmap->size() / 2 + 1;
dout(10) << " outside_quorum now " << outside_quorum << ", need " << need << dendl;
if (outside_quorum.size() >= need) {
if (outside_quorum.count(name)) {
dout(10) << " that's enough to form a new quorum, calling election" << dendl;
start_election();
} else {
dout(10) << " that's enough to form a new quorum, but it does not include me; waiting" << dendl;
}
} else {
dout(10) << " that's not yet enough for a new quorum, waiting" << dendl;
}
} }
}
接下来就是数据的同步部分了:
void Monitor::sync_start(entity_inst_t &other, bool full)
{
dout(10) << __func__ << " " << other << (full ? " full" : " recent") << dendl;
assert(state == STATE_PROBING ||
state == STATE_SYNCHRONIZING);
state = STATE_SYNCHRONIZING;
// make sure are not a provider for anyone!
sync_reset_provider();
sync_full = full;
if (sync_full) {
// stash key state, and mark that we are syncing
auto t(std::make_shared<MonitorDBStore::Transaction>());
sync_stash_critical_state(t);
t->put("mon_sync", "in_sync", 1);
sync_last_committed_floor = MAX(sync_last_committed_floor, paxos->get_version());
dout(10) << __func__ << " marking sync in progress, storing sync_last_committed_floor "
<< sync_last_committed_floor << dendl;
t->put("mon_sync", "last_committed_floor", sync_last_committed_floor);
store->apply_transaction(t);//处理事务了。。。
assert(g_conf->mon_sync_requester_kill_at != 1);
// clear the underlying store
set<string> targets = get_sync_targets_names();
dout(10) << __func__ << " clearing prefixes " << targets << dendl;
store->clear(targets);
// make sure paxos knows it has been reset. this prevents a
// bootstrap and then different probe reply order from possibly
// deciding a partial or no sync is needed.
paxos->init();
assert(g_conf->mon_sync_requester_kill_at != 2);
}
// assume 'other' as the leader. We will update the leader once we receive
// a reply to the sync start.
sync_provider = other;
sync_reset_timeout();
MMonSync *m = new MMonSync(sync_full ? MMonSync::OP_GET_COOKIE_FULL : MMonSync::OP_GET_COOKIE_RECENT);
if (!sync_full)
m->last_committed = paxos->get_version();
messenger->send_message(m, sync_provider);
}
。。。未完待续
罗列下monmap的各种操作
1.stop ceph-mon id=*
停掉mon-2,不停掉是操作不了monmap的db的
2.导出monmap:
ceph-mon -i ID-FOO –extract-monmap /tmp/monmap
3.查看monmap:
monmaptool –print -f /tmp/monmap
4.删除mon-3
monmaptool –rm mon-3 -f /tmp/monmap
5.注入monmap
ceph-mon -i ID –inject-monmap /tmp/monmap