主osd收到副osd发送来的MSG_OSD_PG_NOTIFY消息后,会将该消息中所带的osd的日志信息合并到本地。
主osd收到MSG_OSD_PG_NOTIFY消息后,会调用ms_fast_dispatch进行处理,其调用栈如下
OSD::ms_fast_dispatch(Message *m)
case MSG_OSD_PG_NOTIFY: //pg中其他从osd发来的MOSDPGNotify信息
return handle_fast_pg_notify(static_cast<MOSDPGNotify*>(m));
for (auto& p : m->get_pg_list()) //p的类型是pair<pg_notify_t,PastIntervals>
enqueue_peering_evt(
pgid,
PGPeeringEventRef(
std::make_shared<PGPeeringEvent>(
p.first.epoch_sent,
p.first.query_epoch,
MNotifyRec(
pgid, pg_shard_t(from, p.first.from),
p.first,
m->get_connection()->get_features(),
p.second),
true,
new PGCreateInfo(
pgid,
p.first.query_epoch,
p.first.info.history,
p.second,
false)
)));
op_shardedwq.queue(
OpQueueItem(
unique_ptr<OpQueueItem::OpQueueable>(new PGPeeringItem(pgid, evt)),
10,
cct->_conf->osd_peering_op_priority,
utime_t(),
0,
evt->get_epoch_sent())); //这里插入的Op,在ShardedOpWQ::_process中被处理,如下
ShardedOpWQ的处理函数对该op的处理如下:
OSD::ShardedOpWQ::_process
OpQueueItem item = sdata->pqueue->dequeue();
slot->to_process.push_back(std::move(item));
auto qi = std::move(slot->to_process.front());
qi.run(osd, sdata, pg, tp_handle);
PGPeeringItem::run
osd->dequeue_peering_evt(sdata, pg.get(), evt, handle);
pg->do_peering_event(evt, &rctx);
recovery_state.handle_event(evt, rctx); //evt为上面传进去的 MNotifyRec , 创建pg后第一次执行的这里时传进来的为NullEvt
start_handle(rctx);
machine.process_event(evt->get_event()); //主osd在GetInfo状态收到MNotifyRec事件后,调用PG::RecoveryState::GetInfo::react(const MNotifyRec& infoevt)
PG::RecoveryState::GetInfo::react(const MNotifyRec& infoevt)
set<pg_shard_t>::iterator p = peer_info_requested.find(infoevt.from);
if (p != peer_info_requested.end())
peer_info_requested.erase(p); //删除已经获取到信息的osd
pg->blocked_by.erase(infoevt.from.osd);
pg->proc_replica_info(infoevt.from, infoevt.notify.info, infoevt.notify.epoch_sent)
map<pg_shard_t, pg_info_t>::iterator p = peer_info.find(from);
if (p != peer_info.end() && p->second.last_update == oinfo.last_update) //信息已经存在并且last_update相等,则推出
dout(10) << " got dup osd." << from << " info " << oinfo << ", identical to ours" << dendl;
return false
if (!get_osdmap()->has_been_up_since(from.osd, send_epoch)) //这个osd现在是不是up的,且这个osd的up from 是不是小于等于send_epoch, send_epoch为从osd发送notify时的opoch
return false;
peer_info[from] = oinfo;
might_have_unfound.insert(from);
update_history(oinfo.history); //利用对方的history更新自己的
if (info.history.merge(new_history))
dirty_info = true;
if (info.history.last_epoch_clean >= info.history.same_interval_since)//已经修复完成了,不需要历史信息了
past_intervals.clear()
dirty_big_info = true;
if (!is_up(from) && !is_acting(from)) //如果from都不在这两个集合里
/*
对于replica osd is_up就是从up vector里判断是否有from,相似,is_acting就是从acting vector里找, ec就不太一样了
*/
stray_set.insert(from);
if (is_clean())
purge_strays(); //发消息让这个osd去除pg
if (p == peer_info.end())
update_heartbeat_peers();
if true
if (old_start < pg->info.history.last_epoch_started) //epoch_t old_start = pg->info.history.last_epoch_started;,即未处理replica info之前的
ldout(pg->cct, 10) << " last_epoch_started moved forward, rebuilding prior" << dendl;
prior_set = pg->build_prior(); //因为更改了一些变量,重新建立prior
set<pg_shard_t>::iterator p = peer_info_requested.begin();
while (p != peer_info_requested.end()) //从当前peer_info_requested中删除用不到的probe
if (prior_set.probe.count(*p) == 0) {
peer_info_requested.erase(p++);
} else
++p;
get_infos(); //重新获取probe中的osd信息
if (peer_info_requested.empty() && !prior_set.pg_down)//pg_down只有在past interval阶段osd up的数量不够,且有osd 处于down时才会被设置
post_event(GotInfo()); //此时主osd处于GetInfo状态,接收到GotInfo后转入到GetLog状态,进入GetLog构造函数
(1)从peer_info_requested和blocked_by集合中删除该osd。
(2)调用update_history更新历史信息
(3)因为更新了一些变量,因此需要重新调用build_prior来重新计算要获取日志的osd列表,如果未新的osd,则需要向该osd get_infos。
(4)如果peer_info_requested为空且pg_down为false,则说明目前用于pg恢复的必要的osd的info都已经有了,因此抛出GotInfo事件进入GetLog状态。
GetLog的构造函数如下
PG::RecoveryState::GetLog::GetLog(my_context ctx)
context< RecoveryMachine >().log_enter(state_name); //"Started/Primary/Peering/GetLog"
if (!pg->choose_acting(auth_log_shard, false, &context< Peering >().history_les_bound)) //有选择一个拥有权威日志的OSD auth_log_shard
if (!pg->want_acting.empty()) //monitor中的acting需要变化
post_event(NeedActingChange());
else
post_event(IsIncomplete());
if (auth_log_shard == pg->pg_whoami)
post_event(GotLog());
if (pg->info.last_update < best.log_tail)
post_event(IsIncomplete());
return;
context<RecoveryMachine>().send_query(auth_log_shard,pg_query_t(pg_query_t::LOG, auth_log_shard.shard, pg->pg_whoami.shard, request_log_from, pg->info.history, pg->get_osdmap()->get_epoch()));
pg->blocked_by.insert(auth_log_shard.osd); //返回dequeue_peering_evt函数的调用栈中, 并在dispatch_context中发送query
(1)调用choose_acting去计算acting集合,在初始情况下,monitor会为每一个pg预先分配一个acting集合,但是这个集合不一定准确,因此需要重新计算acting集合。acting集合负责对接客户端读写请求,即客户端直接通信的是acting集合的第一个osd,up集合是crush规则算出来了的,最后pg恢复完成后,acting会被赋值为up集合。如果choose_acting返回false并且want_acting不为空,则说明acting集合需要改变,则抛出NeedActingChange事件。如果acting集合不需要变化,继续往下。
(2)如果当前主OSD就是含有权威日治,则抛出GotLog事件。
(3)如果当前主OSD最新日志小于权威日志的最老日志,则该主OSD恢复不了,抛出IsIncomplete。
(4)调用send_query将query消息入队,query消息是用来向权威日志所在OSD索要权威日志的。其最终会在dispatch_context中发送。
choose_acting用来计算该PG的acting集合,如下
map<pg_shard_t, pg_info_t>::const_iterator auth_log_shard = find_best_info(all_info, restrict_to_up_acting, history_les_bound);
if (auth_log_shard == all_info.end())
if (up != acting)
want_acting = up;
osd->queue_want_pg_temp(info.pgid.pgid, empty);
return false;
auth_log_shard_id = auth_log_shard->first;
calc_replicated_acting(auth_log_shard, get_osdmap()->get_pg_size(info.pgid.pgid), acting, up, up_primary, all_info, restrict_to_up_acting, &want, &want_backfill, &want_acting_backfill, ss);
want->push_back(primary->first.osd); //将这里选举的primary作为want和acting_backfill的第一个,注意这里和monitor选举的可能不一样
acting_backfill->insert(primary->first);
for (vector<int>::const_iterator i = up.begin(); i != up.end(); ++i)
if (cur_info.is_incomplete() || cur_info.last_update < oldest_auth_log_entry)] 其不能根据权威日志修复
backfill->insert(up_cand);
acting_backfill->insert(up_cand);
else
want->push_back(*i);
acting_backfill->insert(up_cand);
if (usable >= size) //如果当前up中的osd足够,则返回
return;
for (vector<int>::const_iterator i = acting.begin();i != acting.end(); ++i) //这个acting是从osdmap中获取出来的,monitor可以提前计算的acting
const pg_info_t &cur_info = all_info.find(acting_cand)->second;
if (cur_info.is_incomplete() || cur_info.last_update < oldest_auth_log_entry)
else
candidate_by_last_update.push_back(make_pair(cur_info.last_update, *i));
for (auto &p: candidate_by_last_update)
want->push_back(p.second);
pg_shard_t s = pg_shard_t(p.second, shard_id_t::NO_SHARD);
acting_backfill->insert(s);
if (usable >= size)
return;
for (map<pg_shard_t,pg_info_t>::const_iterator i = all_info.begin(); i != all_info.end(); ++i)
if (i->second.is_incomplete() || i->second.last_update < oldest_auth_log_entry)
else
candidate_by_last_update.push_back(make_pair(i->second.last_update, i->first.osd));
for (auto &p: candidate_by_last_update)
want->push_back(p.second);
pg_shard_t s = pg_shard_t(p.second, shard_id_t::NO_SHARD);
acting_backfill->insert(s);
usable++;
if (usable >= size)
return;
if (want != acting) { //需要向monitor申请acting(temp),因为这时候acting是从osdmap中的temppg里获取的,其为monitor提前计算的
want_acting = want;
if (want_acting == up)
osd->queue_want_pg_temp(info.pgid.pgid, empty);
else
osd->queue_want_pg_temp(info.pgid.pgid, want);
return false;
acting_recovery_backfill = want_acting_backfill;
if (backfill_targets.empty())
backfill_targets = want_backfill; //want_backfill是up集合中不能根据权威日志来恢复的osd,不能依据权威日志就是其last_update小于权威日治最老的版本
return true;
(1)对于副本模式,调用calc_replicated_acting去计算want, want_backfill, want_acting_backfill。
(2)将主OSD入队到want和want_acting_backfill集合。
(3)遍历up集合的OSD,如果OSD不能根据权威日志来reocvery,则加入到want_backfill和want_acting_backfill集合,否则加入到want和want_acting_backfill集合。
(4)遍历acting集合,注意此时这个acting集合是monitor预先分配的集合,这个预先分配的集合很可能不适用,所以需要改变。如果acting集合中的OSD可以根据权威日志恢复(last_update不小于权威日志的最老日志),则加入到want和want_acting_backfill集合。
(5)从all_info中获取,其依据也是能根据权威日志恢复,如果可以则加入到want和want_acting_backfill集合。
注意以上过程中,如果加入到want中的数量达到了pg的副本大小就退出。因此可以看到want集合保存了计算得到的真正的acting集合,want_backfill保存了up集合中需要执行backfilling操作的OSD,want_acting_backfill是want和want_backfill的并集。
(6)calc_replicated_acting返回后,如果want和acting集合不相等,就需要通知monitor去修改acting集合。这是通过queue_want_pg_temp实现的,queue_want_pg_temp只讲wang加入到pg_temp_wanted中,在函数调用栈返回到dequeue_peering_evt中后,通过调用service.send_pg_temp();来发送给monitor。
在GetLog状态里,如果主OSD就权威日志所在OSD,则抛出GotLog事件,GetLog状态对GotLog事件的处理如下:
PG::RecoveryState::GetLog::react(const GotLog&)
if (msg) //如果主osd不是权威osd,则需要向权威osd索要日志,此时msg不为空
pg->proc_master_log(*context<RecoveryMachine>().get_cur_transaction(),msg->info, msg->log, msg->missing,auth_log_shard);
pg->start_flush(context< RecoveryMachine >().get_cur_transaction());
return transit< GetMissing >(); //进入GetMissing状态
PG::RecoveryState::GetMissing::GetMissing(my_context ctx)
context< RecoveryMachine >().log_enter(state_name);
for (set<pg_shard_t>::iterator i = pg->acting_recovery_backfill.begin();i != pg->acting_recovery_backfill.end();++i)//acting_recovery_backfill就是want_acting_backfilling集合
const pg_info_t& pi = pg->peer_info[*i];
if (pi.last_update < pg->pg_log.get_tail()) //不需要拉取日志了,因为不能用日志来恢复了,只能执行backfilling
pg->peer_missing[*i].clear();
continue;
if (pi.last_backfill == hobject_t())
pg->peer_missing[*i].clear();
continue;
if (pi.last_update == pi.last_complete && pi.last_update == pg->info.last_update)
pg->peer_missing[*i].clear();
continue;
since.epoch = pi.last_epoch_started;
if (pi.log_tail <= since)
context< RecoveryMachine >().send_query(*i, pg_query_t(pg_query_t::LOG, i->shard, pg->pg_whoami.shard, since, pg->info.history, pg->get_osdmap()->get_epoch()));
else
context< RecoveryMachine >().send_query(*i, pg_query_t(pg_query_t::FULLLOG, i->shard, pg->pg_whoami.shard,pg->info.history, pg->get_osdmap()->get_epoch()));
peer_missing_requested.insert(*i);
pg->blocked_by.insert(i->osd);
if (peer_missing_requested.empty())
if (pg->need_up_thru)
post_event(NeedUpThru());
post_event(Activate(pg->get_osdmap()->get_epoch()));// all good! 当前子状态都处于Peering状态boost::statechart::transition< Activate, Active >,接收到Activate事件后,进入Active状态
context< RecoveryMachine >().log_enter(state_name); //"Started/Primary/Active"
pg->start_flush(context< RecoveryMachine >().get_cur_transaction());
pg->activate(*context< RecoveryMachine >().get_cur_transaction(),pg->get_osdmap()->get_epoch(),*context< RecoveryMachine >().get_query_map(),context< RecoveryMachine >().get_info_map(),context< RecoveryMachine >().get_recovery_ctx());
pg->publish_stats_to_osd();
(1)遍历acting_recovery_backfill集合,acting_recovery_backfill集合包括了acting集合和up中需要backfilling的osd集合,如果集合中的osd可以根据权威日志获取,则需要向这个osd发送query消息拉取日志,分为拉取全部日志和部分日志,并将此osd插入到peer_missing_requested中。
(2)如果peer_missing_requested为空,则说明不需要向其他osd拉取日志,其他osd不用恢复或者只用backfilling恢复,就抛出Activate事件。
如果拉取的日志返回来了,则抛出MLogRec事件,GetMissing对该事件处理如下
boost::statechart::result PG::RecoveryState::GetMissing::react(const MLogRec& logevt)
peer_missing_requested.erase(logevt.from);
pg->proc_replica_log(logevt.msg->info, logevt.msg->log, logevt.msg->missing, logevt.from);
pg_log.proc_replica_log(oinfo, olog, omissing, from);
if (peer_missing_requested.empty())
post_event(Activate(pg->get_osdmap()->get_epoch()));
如果收到对方发送来的日志,就从peer_missing_requested中删除这个osd,并调用proc_replica_log处理副本日志,如果peer_missing_requested为空,则说明要reocvery的osd的日志都受到了,就抛出Activate事件。