ceph pg peering和恢复 (2)

周麒
2023-12-01

主osd收到副osd发送来的MSG_OSD_PG_NOTIFY消息后,会将该消息中所带的osd的日志信息合并到本地。

主osd收到MSG_OSD_PG_NOTIFY消息后,会调用ms_fast_dispatch进行处理,其调用栈如下

OSD::ms_fast_dispatch(Message *m)  
	case MSG_OSD_PG_NOTIFY:  //pg中其他从osd发来的MOSDPGNotify信息
		return handle_fast_pg_notify(static_cast<MOSDPGNotify*>(m));   
			for (auto& p : m->get_pg_list())  //p的类型是pair<pg_notify_t,PastIntervals>
				enqueue_peering_evt(
					  pgid,
					  PGPeeringEventRef(
					std::make_shared<PGPeeringEvent>(
					  p.first.epoch_sent,
					  p.first.query_epoch,
					  MNotifyRec(
						pgid, pg_shard_t(from, p.first.from),
						p.first,
						m->get_connection()->get_features(),
						p.second),
					  true,
					  new PGCreateInfo(
						pgid,
						p.first.query_epoch,
						p.first.info.history,
						p.second,
						false)
					  ))); 
					op_shardedwq.queue(
					  OpQueueItem(
						unique_ptr<OpQueueItem::OpQueueable>(new PGPeeringItem(pgid, evt)),
						10,
						cct->_conf->osd_peering_op_priority,
						utime_t(),
						0,
						evt->get_epoch_sent()));  //这里插入的Op,在ShardedOpWQ::_process中被处理,如下

ShardedOpWQ的处理函数对该op的处理如下:

OSD::ShardedOpWQ::_process
	OpQueueItem item = sdata->pqueue->dequeue();
	slot->to_process.push_back(std::move(item));
	auto qi = std::move(slot->to_process.front());
		qi.run(osd, sdata, pg, tp_handle);				  
			PGPeeringItem::run
				osd->dequeue_peering_evt(sdata, pg.get(), evt, handle);
					pg->do_peering_event(evt, &rctx);
						recovery_state.handle_event(evt, rctx); //evt为上面传进去的 MNotifyRec , 创建pg后第一次执行的这里时传进来的为NullEvt
							start_handle(rctx);
							machine.process_event(evt->get_event()); //主osd在GetInfo状态收到MNotifyRec事件后,调用PG::RecoveryState::GetInfo::react(const MNotifyRec& infoevt)
								PG::RecoveryState::GetInfo::react(const MNotifyRec& infoevt)
									set<pg_shard_t>::iterator p = peer_info_requested.find(infoevt.from);
									if (p != peer_info_requested.end())
										peer_info_requested.erase(p);   //删除已经获取到信息的osd
										pg->blocked_by.erase(infoevt.from.osd);
									pg->proc_replica_info(infoevt.from, infoevt.notify.info, infoevt.notify.epoch_sent)  
										map<pg_shard_t, pg_info_t>::iterator p = peer_info.find(from);
										if (p != peer_info.end() && p->second.last_update == oinfo.last_update)  //信息已经存在并且last_update相等,则推出
											dout(10) << " got dup osd." << from << " info " << oinfo << ", identical to ours" << dendl;
											return false 
										if (!get_osdmap()->has_been_up_since(from.osd, send_epoch))  //这个osd现在是不是up的,且这个osd的up from 是不是小于等于send_epoch,  send_epoch为从osd发送notify时的opoch
											return false;
										peer_info[from] = oinfo;
										might_have_unfound.insert(from);
										update_history(oinfo.history); //利用对方的history更新自己的
											if (info.history.merge(new_history))
											dirty_info = true;
											if (info.history.last_epoch_clean >= info.history.same_interval_since)//已经修复完成了,不需要历史信息了
												past_intervals.clear()
												dirty_big_info = true;
										if (!is_up(from) && !is_acting(from))  //如果from都不在这两个集合里
										/*
											对于replica osd is_up就是从up vector里判断是否有from,相似,is_acting就是从acting vector里找, ec就不太一样了
										*/
											stray_set.insert(from);
											if (is_clean())
												purge_strays();  //发消息让这个osd去除pg
										if (p == peer_info.end())
											update_heartbeat_peers();
									if true
										if (old_start < pg->info.history.last_epoch_started)  //epoch_t old_start = pg->info.history.last_epoch_started;,即未处理replica info之前的
											ldout(pg->cct, 10) << " last_epoch_started moved forward, rebuilding prior" << dendl;
											prior_set = pg->build_prior();   //因为更改了一些变量,重新建立prior
											set<pg_shard_t>::iterator p = peer_info_requested.begin();

											while (p != peer_info_requested.end()) //从当前peer_info_requested中删除用不到的probe
											if (prior_set.probe.count(*p) == 0) {
												peer_info_requested.erase(p++);
											} else
												++p;
											get_infos();  //重新获取probe中的osd信息
										if (peer_info_requested.empty() && !prior_set.pg_down)//pg_down只有在past interval阶段osd up的数量不够,且有osd 处于down时才会被设置
											post_event(GotInfo());  //此时主osd处于GetInfo状态,接收到GotInfo后转入到GetLog状态,进入GetLog构造函数

(1)从peer_info_requested和blocked_by集合中删除该osd。
(2)调用update_history更新历史信息
(3)因为更新了一些变量,因此需要重新调用build_prior来重新计算要获取日志的osd列表,如果未新的osd,则需要向该osd get_infos。
(4)如果peer_info_requested为空且pg_down为false,则说明目前用于pg恢复的必要的osd的info都已经有了,因此抛出GotInfo事件进入GetLog状态。

GetLog的构造函数如下

PG::RecoveryState::GetLog::GetLog(my_context ctx)
	context< RecoveryMachine >().log_enter(state_name);  //"Started/Primary/Peering/GetLog"
	if (!pg->choose_acting(auth_log_shard, false, &context< Peering >().history_les_bound))	  //有选择一个拥有权威日志的OSD auth_log_shard
		if (!pg->want_acting.empty())   //monitor中的acting需要变化
            post_event(NeedActingChange());
		else
            post_event(IsIncomplete());
	if (auth_log_shard == pg->pg_whoami)
		post_event(GotLog());  
    if (pg->info.last_update < best.log_tail)
        post_event(IsIncomplete());
        return;
	context<RecoveryMachine>().send_query(auth_log_shard,pg_query_t(pg_query_t::LOG, auth_log_shard.shard, pg->pg_whoami.shard, request_log_from, pg->info.history,  pg->get_osdmap()->get_epoch()));
    pg->blocked_by.insert(auth_log_shard.osd); //返回dequeue_peering_evt函数的调用栈中, 并在dispatch_context中发送query

(1)调用choose_acting去计算acting集合,在初始情况下,monitor会为每一个pg预先分配一个acting集合,但是这个集合不一定准确,因此需要重新计算acting集合。acting集合负责对接客户端读写请求,即客户端直接通信的是acting集合的第一个osd,up集合是crush规则算出来了的,最后pg恢复完成后,acting会被赋值为up集合。如果choose_acting返回false并且want_acting不为空,则说明acting集合需要改变,则抛出NeedActingChange事件。如果acting集合不需要变化,继续往下。
(2)如果当前主OSD就是含有权威日治,则抛出GotLog事件。
(3)如果当前主OSD最新日志小于权威日志的最老日志,则该主OSD恢复不了,抛出IsIncomplete。
(4)调用send_query将query消息入队,query消息是用来向权威日志所在OSD索要权威日志的。其最终会在dispatch_context中发送。

choose_acting用来计算该PG的acting集合,如下

map<pg_shard_t, pg_info_t>::const_iterator auth_log_shard = find_best_info(all_info, restrict_to_up_acting, history_les_bound);
if (auth_log_shard == all_info.end())
    if (up != acting)
        want_acting = up;
        osd->queue_want_pg_temp(info.pgid.pgid, empty);
    return false;
auth_log_shard_id = auth_log_shard->first;
calc_replicated_acting(auth_log_shard, get_osdmap()->get_pg_size(info.pgid.pgid), acting, up, up_primary, all_info, restrict_to_up_acting, &want, &want_backfill, &want_acting_backfill, ss);
    want->push_back(primary->first.osd); //将这里选举的primary作为want和acting_backfill的第一个,注意这里和monitor选举的可能不一样
    acting_backfill->insert(primary->first);
    
    for (vector<int>::const_iterator i = up.begin(); i != up.end(); ++i)
        if (cur_info.is_incomplete() || cur_info.last_update < oldest_auth_log_entry)] 其不能根据权威日志修复
            backfill->insert(up_cand);
            acting_backfill->insert(up_cand);
        else
            want->push_back(*i);
            acting_backfill->insert(up_cand);
    
    if (usable >= size) //如果当前up中的osd足够,则返回
        return;
        
    for (vector<int>::const_iterator i = acting.begin();i != acting.end(); ++i) //这个acting是从osdmap中获取出来的,monitor可以提前计算的acting 
        const pg_info_t &cur_info = all_info.find(acting_cand)->second;
        if (cur_info.is_incomplete() || cur_info.last_update < oldest_auth_log_entry)
        
        else
            candidate_by_last_update.push_back(make_pair(cur_info.last_update, *i));
    for (auto &p: candidate_by_last_update)    
        want->push_back(p.second);
        pg_shard_t s = pg_shard_t(p.second, shard_id_t::NO_SHARD);
        acting_backfill->insert(s);
        if (usable >= size) 
            return;
    
    for (map<pg_shard_t,pg_info_t>::const_iterator i = all_info.begin(); i != all_info.end();  ++i)
        if (i->second.is_incomplete() || i->second.last_update < oldest_auth_log_entry)
        else
            candidate_by_last_update.push_back(make_pair(i->second.last_update, i->first.osd));
    for (auto &p: candidate_by_last_update)
        want->push_back(p.second);
        pg_shard_t s = pg_shard_t(p.second, shard_id_t::NO_SHARD);
        acting_backfill->insert(s);
        usable++;
        if (usable >= size)
            return;
if (want != acting) { //需要向monitor申请acting(temp),因为这时候acting是从osdmap中的temppg里获取的,其为monitor提前计算的
    want_acting = want;
    if (want_acting == up)
        osd->queue_want_pg_temp(info.pgid.pgid, empty);
    else
        osd->queue_want_pg_temp(info.pgid.pgid, want);
    return false;

acting_recovery_backfill = want_acting_backfill;  
if (backfill_targets.empty())
    backfill_targets = want_backfill; //want_backfill是up集合中不能根据权威日志来恢复的osd,不能依据权威日志就是其last_update小于权威日治最老的版本
return true;

(1)对于副本模式,调用calc_replicated_acting去计算want, want_backfill, want_acting_backfill。
(2)将主OSD入队到want和want_acting_backfill集合。
(3)遍历up集合的OSD,如果OSD不能根据权威日志来reocvery,则加入到want_backfill和want_acting_backfill集合,否则加入到want和want_acting_backfill集合。
(4)遍历acting集合,注意此时这个acting集合是monitor预先分配的集合,这个预先分配的集合很可能不适用,所以需要改变。如果acting集合中的OSD可以根据权威日志恢复(last_update不小于权威日志的最老日志),则加入到want和want_acting_backfill集合。
(5)从all_info中获取,其依据也是能根据权威日志恢复,如果可以则加入到want和want_acting_backfill集合。
注意以上过程中,如果加入到want中的数量达到了pg的副本大小就退出。因此可以看到want集合保存了计算得到的真正的acting集合,want_backfill保存了up集合中需要执行backfilling操作的OSD,want_acting_backfill是want和want_backfill的并集。
(6)calc_replicated_acting返回后,如果want和acting集合不相等,就需要通知monitor去修改acting集合。这是通过queue_want_pg_temp实现的,queue_want_pg_temp只讲wang加入到pg_temp_wanted中,在函数调用栈返回到dequeue_peering_evt中后,通过调用service.send_pg_temp();来发送给monitor。

在GetLog状态里,如果主OSD就权威日志所在OSD,则抛出GotLog事件,GetLog状态对GotLog事件的处理如下:

PG::RecoveryState::GetLog::react(const GotLog&)
    if (msg) //如果主osd不是权威osd,则需要向权威osd索要日志,此时msg不为空
        pg->proc_master_log(*context<RecoveryMachine>().get_cur_transaction(),msg->info, msg->log, msg->missing,auth_log_shard);
            
        pg->start_flush(context< RecoveryMachine >().get_cur_transaction());
        return transit< GetMissing >();  //进入GetMissing状态
            PG::RecoveryState::GetMissing::GetMissing(my_context ctx)
                context< RecoveryMachine >().log_enter(state_name);
                for (set<pg_shard_t>::iterator i = pg->acting_recovery_backfill.begin();i != pg->acting_recovery_backfill.end();++i)//acting_recovery_backfill就是want_acting_backfilling集合
                    const pg_info_t& pi = pg->peer_info[*i];
                    if (pi.last_update < pg->pg_log.get_tail()) //不需要拉取日志了,因为不能用日志来恢复了,只能执行backfilling
                        pg->peer_missing[*i].clear();
                        continue;
                    if (pi.last_backfill == hobject_t())
                        pg->peer_missing[*i].clear();
                        continue;
                    if (pi.last_update == pi.last_complete && pi.last_update == pg->info.last_update) 
                        pg->peer_missing[*i].clear();
                        continue;
                    since.epoch = pi.last_epoch_started;
                    if (pi.log_tail <= since)
                        context< RecoveryMachine >().send_query(*i, pg_query_t(pg_query_t::LOG, i->shard, pg->pg_whoami.shard, since, pg->info.history, pg->get_osdmap()->get_epoch()));
                    else
                        context< RecoveryMachine >().send_query(*i, pg_query_t(pg_query_t::FULLLOG,  i->shard, pg->pg_whoami.shard,pg->info.history, pg->get_osdmap()->get_epoch()));      
                    peer_missing_requested.insert(*i);
                    pg->blocked_by.insert(i->osd);
                if (peer_missing_requested.empty())
                    if (pg->need_up_thru)
                        post_event(NeedUpThru());
                    post_event(Activate(pg->get_osdmap()->get_epoch()));// all good!   当前子状态都处于Peering状态boost::statechart::transition< Activate, Active >,接收到Activate事件后,进入Active状态
                        context< RecoveryMachine >().log_enter(state_name); //"Started/Primary/Active"
                        pg->start_flush(context< RecoveryMachine >().get_cur_transaction());
                        pg->activate(*context< RecoveryMachine >().get_cur_transaction(),pg->get_osdmap()->get_epoch(),*context< RecoveryMachine >().get_query_map(),context< RecoveryMachine >().get_info_map(),context< RecoveryMachine >().get_recovery_ctx());
                        pg->publish_stats_to_osd();

(1)遍历acting_recovery_backfill集合,acting_recovery_backfill集合包括了acting集合和up中需要backfilling的osd集合,如果集合中的osd可以根据权威日志获取,则需要向这个osd发送query消息拉取日志,分为拉取全部日志和部分日志,并将此osd插入到peer_missing_requested中。
(2)如果peer_missing_requested为空,则说明不需要向其他osd拉取日志,其他osd不用恢复或者只用backfilling恢复,就抛出Activate事件。

如果拉取的日志返回来了,则抛出MLogRec事件,GetMissing对该事件处理如下

boost::statechart::result PG::RecoveryState::GetMissing::react(const MLogRec& logevt)
    peer_missing_requested.erase(logevt.from);
    pg->proc_replica_log(logevt.msg->info, logevt.msg->log, logevt.msg->missing, logevt.from);
        pg_log.proc_replica_log(oinfo, olog, omissing, from);
    if (peer_missing_requested.empty()) 
        post_event(Activate(pg->get_osdmap()->get_epoch()));    

如果收到对方发送来的日志,就从peer_missing_requested中删除这个osd,并调用proc_replica_log处理副本日志,如果peer_missing_requested为空,则说明要reocvery的osd的日志都受到了,就抛出Activate事件。

 类似资料: