// fhgfs_meta\source\app\App.cpp
void App::initComponents(TargetConsistencyState initialConsistencyState)
throw(ComponentInitException)
{
...
this->buddyResyncer = new BuddyResyncer();
this->internodeSyncer = new InternodeSyncer(initialConsistencyState);
...
}
void App::startComponents()
{
...
this->internodeSyncer->start();
...
}
// fhgfs_meta\source\app\App.h
class App : public AbstractApp
{
...
InternodeSyncer* getInternodeSyncer() const
{
return internodeSyncer;
}
BuddyResyncer* getBuddyResyncer()
{
return this->buddyResyncer;
}
...
}
// fhgfs_common\source\common\toolkit\OfflineWaitTimeoutTk.h
/**
* Calculates the offline wait timeout from config variables. It consists of:
* 5 sec InternodeSyncer syncLoop interval.
* 3 * update interval:
* until target gets pofflined (2x)
* end of offline timeout until next target state update (worst case)
* plus the actual target offline timeout.
*
* Templated for the Config type because Storage and Meta server have different Config classes.
*/
template<typename Cfg>
class OfflineWaitTimeoutTk
{
public:
static unsigned int calculate(Cfg* cfg)
{
const unsigned updateTargetStatesSecs = cfg->getSysUpdateTargetStatesSecs();
if (updateTargetStatesSecs != 0)
{
// If sysUpdateTargetStateSecs is set in config, use that value.
return (
( 5
+ 3 * updateTargetStatesSecs
+ cfg->getSysTargetOfflineTimeoutSecs()
) * 1000);
}
else
{
// If sysUpdateTargetStatesSecs hasn't been set in config, it defaults to 1/3 the value
// of sysTargetOfflineTimeoutSecs -> we use 3 * 1/3 sysTargetOfflineTimeoutSecs.
return (
( 5
+ 2 * cfg->getSysTargetOfflineTimeoutSecs()
) * 1000);
}
}
};
// fhgfs_meta\source\storage\NodeOfflineWait.h
class NodeOfflineWait
{
public:
NodeOfflineWait(Config* cfg)
: waitTimeoutMS(OfflineWaitTimeoutTk<Config>::calculate(cfg) ),
active(false)
{ }
}
// fhgfs_meta\source\components\InternodeSyncer.cpp
InternodeSyncer::InternodeSyncer(TargetConsistencyState initialConsistencyState)
throw(ComponentInitException)
: PThread("XNodeSync"),
log("XNodeSync"),
forcePoolsUpdate(true),
forceTargetStatesUpdate(true),
forcePublishCapacities(true),
offlineWait(Program::getApp()->getConfig() ),
nodeConsistencyState(initialConsistencyState),
buddyResyncInProgress(false)
{
MirrorBuddyGroupMapper* mbg = Program::getApp()->getMetaBuddyGroupMapper();
MirrorBuddyState buddyState = mbg->getBuddyState(Program::getApp()->getLocalNodeNumID().val() );
if ((buddyState == BuddyState_PRIMARY)
&& (nodeConsistencyState == TargetConsistencyState_NEEDS_RESYNC))
offlineWait.startTimer();
}
void InternodeSyncer::syncLoop()
{
App* app = Program::getApp();
Config* cfg = app->getConfig();
const int sleepIntervalMS = 3*1000; // 3sec
// If (undocumented) sysUpdateTargetStatesSecs is set in config, use that value, otherwise
// default to 1/6 sysTargetOfflineTimeoutSecs.
const unsigned updateTargetStatesMS =
(cfg->getSysUpdateTargetStatesSecs() != 0)
? cfg->getSysUpdateTargetStatesSecs() * 1000
: cfg->getSysTargetOfflineTimeoutSecs() * 166;
const unsigned updateCapacityPoolsMS = 4 * updateTargetStatesMS;
const unsigned metaCacheSweepNormalMS = 5*1000; // 5sec
const unsigned metaCacheSweepStressedMS = 2*1000; // 2sec
const unsigned idleDisconnectIntervalMS = 70*60*1000; /* 70 minutes (must be less than half the
streamlis idle disconnect interval to avoid cases where streamlis disconnects first) */
const unsigned updateIDTimeMS = 60 * 1000; // 1 min
const unsigned downloadNodesIntervalMS = 300000; // 5 min
Time lastCapacityUpdateT;
Time lastMetaCacheSweepT;
Time lastIdleDisconnectT;
Time lastTimeIDSet;
Time lastTargetStatesUpdateT;
Time lastDownloadNodesT;
Time lastCapacityPublishedT;
unsigned currentCacheSweepMS = metaCacheSweepNormalMS; // (adapted inside the loop below)
while(!waitForSelfTerminateOrder(sleepIntervalMS) )
{
bool doCapacityPoolsUpdate = getAndResetForcePoolsUpdate()
|| (lastCapacityUpdateT.elapsedMS() > updateCapacityPoolsMS);
bool doTargetStatesUpdate = getAndResetForceTargetStatesUpdate()
|| (lastTargetStatesUpdateT.elapsedMS() > updateTargetStatesMS);
bool doPublishCapacities = getAndResetForcePublishCapacities()
|| (lastCapacityPublishedT.elapsedMS() > updateTargetStatesMS);
// download & sync nodes
if(lastDownloadNodesT.elapsedMS() > downloadNodesIntervalMS)
{
downloadAndSyncNodes();
downloadAndSyncTargetMappings();
lastDownloadNodesT.setToNow();
}
if(doCapacityPoolsUpdate)
{
downloadAndSyncCapacityPools();
lastCapacityUpdateT.setToNow();
}
if(lastMetaCacheSweepT.elapsedMS() > currentCacheSweepMS)
{
bool flushTriggered = app->getMetaStore()->cacheSweepAsync();
currentCacheSweepMS = (flushTriggered ? metaCacheSweepStressedMS : metaCacheSweepNormalMS);
lastMetaCacheSweepT.setToNow();
}
if(lastIdleDisconnectT.elapsedMS() > idleDisconnectIntervalMS)
{
dropIdleConns();
lastIdleDisconnectT.setToNow();
}
if(lastTimeIDSet.elapsedMS() > updateIDTimeMS)
{
StorageTk::resetIDCounterToNow();
lastTimeIDSet.setToNow();
}
if(doTargetStatesUpdate)
{
if (this->offlineWait.hasTimeout() )
{
// if we're waiting to be offlined, set our local state to needs-resync and don't report
// anything to the mgmtd
setNodeConsistencyState(TargetConsistencyState_NEEDS_RESYNC);
}
else
{
TargetConsistencyState newConsistencyState;
updateMetaStatesAndBuddyGroups(newConsistencyState, true);
setNodeConsistencyState(newConsistencyState);
downloadAndSyncTargetStatesAndBuddyGroups();
}
lastTargetStatesUpdateT.setToNow();
}
if (doPublishCapacities)
{
publishNodeCapacity();
lastCapacityPublishedT.setToNow();
}
}
}
// fhgfs_meta\source\components\InternodeSyncer.h
class InternodeSyncer : public PThread
{
public:
InternodeSyncer(TargetConsistencyState initialConsistencyState) throw(ComponentInitException);
virtual ~InternodeSyncer() { }
static bool registerNode(AbstractDatagramListener* dgramLis);
static bool updateMetaStatesAndBuddyGroups(TargetConsistencyState& outConsistencyState,
bool publish);
static void syncClients(const std::vector<NodeHandle>& clientsList, bool allowRemoteComm);
static bool downloadAndSyncNodes();
static bool downloadAndSyncTargetMappings();
static bool downloadAndSyncTargetStatesAndBuddyGroups();
static bool downloadAndSyncCapacityPools();
static void downloadAndSyncClients(bool requeue);
static bool downloadAllExceededQuotaLists();
static bool downloadExceededQuotaList(QuotaDataType idType, QuotaLimitType exType,
UIntList* outIDList, FhgfsOpsErr& error);
static void printSyncResults(NodeType nodeType, NumNodeIDList* addedNodes,
NumNodeIDList* removedNodes);
private:
LogContext log;
Mutex forcePoolsUpdateMutex;
Mutex forceTargetStatesUpdateMutex;
Mutex forcePublishCapacitiesMutex;
bool forcePoolsUpdate; // true to force update of capacity pools
bool forceTargetStatesUpdate; // true to force update of node state
bool forcePublishCapacities; // true to force publishing free capacity
// Keeps track of the timeout during which the node may not send state reports because it is
// waiting to be offlined by the mgmtd.
NodeOfflineWait offlineWait;
Mutex nodeConsistencyStateMutex;
TargetConsistencyState nodeConsistencyState; // Node's own consistency state.
// Note: This is initialized when updateMetaStates... is called from App::downloadMgmtInfo.
AtomicUInt32 buddyResyncInProgress;
virtual void run();
void syncLoop();
static bool updateMetaCapacityPools();
static bool updateMetaBuddyCapacityPools();
static bool updateStorageCapacityPools();
static bool updateTargetBuddyCapacityPools();
static bool downloadCapacityPools(CapacityPoolQueryType poolType, UInt16List* outListNormal,
UInt16List* outListLow, UInt16List* outListEmergency);
void publishNodeCapacity();
void forceMgmtdPoolsRefresh();
void dropIdleConns();
unsigned dropIdleConnsByStore(NodeStoreServersEx* nodes);
void getStatInfo(int64_t* outSizeTotal, int64_t* outSizeFree, int64_t* outInodesTotal,
int64_t* outInodesFree);
static TargetConsistencyState decideResync(const CombinedTargetState newState);
static bool publishNodeStateChange(const TargetConsistencyState oldState,
const TargetConsistencyState newState);
public:
// inliners
void setForcePoolsUpdate()
{
std::lock_guard<Mutex> lock(forcePoolsUpdateMutex);
forcePoolsUpdate = true;
}
void setForceTargetStatesUpdate()
{
std::lock_guard<Mutex> lock(forceTargetStatesUpdateMutex);
forceTargetStatesUpdate = true;
}
void setForcePublishCapacities()
{
std::lock_guard<Mutex> lock(forcePublishCapacitiesMutex);
forcePublishCapacities = true;
}
TargetConsistencyState getNodeConsistencyState()
{
std::lock_guard<Mutex> lock(nodeConsistencyStateMutex);
return nodeConsistencyState;
}
void setNodeConsistencyState(TargetConsistencyState newState)
{
std::lock_guard<Mutex> lock(nodeConsistencyStateMutex);
nodeConsistencyState = newState;
}
void setResyncInProgress(bool resyncInProgress)
{
this->buddyResyncInProgress.set(resyncInProgress);
}
bool getResyncInProgress()
{
return this->buddyResyncInProgress.read();
}
private:
// inliners
bool getAndResetForcePoolsUpdate()
{
std::lock_guard<Mutex> lock(forcePoolsUpdateMutex);
bool retVal = forcePoolsUpdate;
forcePoolsUpdate = false;
return retVal;
}
bool getAndResetForceTargetStatesUpdate()
{
std::lock_guard<Mutex> lock(forceTargetStatesUpdateMutex);
bool retVal = forceTargetStatesUpdate;
forceTargetStatesUpdate = false;
return retVal;
}
bool getAndResetForcePublishCapacities()
{
std::lock_guard<Mutex> lock(forcePublishCapacitiesMutex);
bool retVal = forcePublishCapacities;
forcePublishCapacities = false;
return retVal;
}
};
// fhgfs_meta\source\app\App.cpp
/**
* @throw InvalidConfigException on error
*/
void App::runNormal()
{
...
bool downloadRes = downloadMgmtInfo(initialConsistencyState);
if (!downloadRes)
{
log->log(1, "Downloading target states from management node failed. Shutting down...");
appResult = APPCODE_INITIALIZATION_ERROR;
return;
}
...
}
/**
* Downloads the list of nodes, targets and buddy groups (for meta and storage servers) from the
* mgmtd.
*
* @param outInitialConsistencyState The consistency state the local meta node has on the mgmtd
* before any state reports are sent.
*/
bool App::downloadMgmtInfo(TargetConsistencyState& outInitialConsistencyState)
{
Config* cfg = this->getConfig();
int retrySleepTimeMS = 10000; // 10sec
unsigned udpListenPort = cfg->getConnMetaPortUDP();
bool allSuccessful = false;
// start temporary registration datagram listener
RegistrationDatagramListener regDGramLis(netFilter, localNicList, ackStore, udpListenPort);
regDGramLis.start();
// loop until we're registered and everything is downloaded (or until we got interrupted)
do
{
// register ourselves
// (note: node registration needs to be done before downloads to get notified of updates)
if (!InternodeSyncer::registerNode(®DGramLis) )
continue;
// download all mgmt info the HBM cares for
if (!InternodeSyncer::downloadAndSyncNodes() ||
!InternodeSyncer::downloadAndSyncTargetMappings() ||
!InternodeSyncer::downloadAndSyncTargetStatesAndBuddyGroups() ||
!InternodeSyncer::downloadAndSyncCapacityPools())
continue;
InternodeSyncer::downloadAndSyncClients(false);
// ...and then the InternodeSyncer's part.
if (!InternodeSyncer::updateMetaStatesAndBuddyGroups(outInitialConsistencyState, false) )
continue;
if(!InternodeSyncer::downloadAllExceededQuotaLists() )
continue;
allSuccessful = true;
break;
} while(!waitForSelfTerminateOrder(retrySleepTimeMS) );
// stop temporary registration datagram listener
regDGramLis.selfTerminate();
regDGramLis.sendDummyToSelfUDP(); // for faster termination
regDGramLis.join();
if(allSuccessful)
log->log(Log_NOTICE, "Registration and management info download complete.");
return allSuccessful;
}
// fhgfs_meta\source\components\InternodeSyncer.cpp
void InternodeSyncer::run()
{
try
{
registerSignalHandler();
syncLoop();
log.log(Log_DEBUG, "Component stopped.");
}
catch(std::exception& e)
{
PThread::getCurrentThreadApp()->handleComponentException(e);
}
}
void InternodeSyncer::syncLoop()
{
App* app = Program::getApp();
Config* cfg = app->getConfig();
const int sleepIntervalMS = 3*1000; // 3sec
// If (undocumented) sysUpdateTargetStatesSecs is set in config, use that value, otherwise
// default to 1/6 sysTargetOfflineTimeoutSecs.
const unsigned updateTargetStatesMS =
(cfg->getSysUpdateTargetStatesSecs() != 0)
? cfg->getSysUpdateTargetStatesSecs() * 1000
: cfg->getSysTargetOfflineTimeoutSecs() * 166;
const unsigned updateCapacityPoolsMS = 4 * updateTargetStatesMS;
const unsigned metaCacheSweepNormalMS = 5*1000; // 5sec
const unsigned metaCacheSweepStressedMS = 2*1000; // 2sec
const unsigned idleDisconnectIntervalMS = 70*60*1000; /* 70 minutes (must be less than half the
streamlis idle disconnect interval to avoid cases where streamlis disconnects first) */
const unsigned updateIDTimeMS = 60 * 1000; // 1 min
const unsigned downloadNodesIntervalMS = 300000; // 5 min
Time lastCapacityUpdateT;
Time lastMetaCacheSweepT;
Time lastIdleDisconnectT;
Time lastTimeIDSet;
Time lastTargetStatesUpdateT;
Time lastDownloadNodesT;
Time lastCapacityPublishedT;
unsigned currentCacheSweepMS = metaCacheSweepNormalMS; // (adapted inside the loop below)
while(!waitForSelfTerminateOrder(sleepIntervalMS) )
{
bool doCapacityPoolsUpdate = getAndResetForcePoolsUpdate()
|| (lastCapacityUpdateT.elapsedMS() > updateCapacityPoolsMS);
bool doTargetStatesUpdate = getAndResetForceTargetStatesUpdate()
|| (lastTargetStatesUpdateT.elapsedMS() > updateTargetStatesMS);
bool doPublishCapacities = getAndResetForcePublishCapacities()
|| (lastCapacityPublishedT.elapsedMS() > updateTargetStatesMS);
// download & sync nodes
if(lastDownloadNodesT.elapsedMS() > downloadNodesIntervalMS)
{
downloadAndSyncNodes();
downloadAndSyncTargetMappings();
lastDownloadNodesT.setToNow();
}
if(doCapacityPoolsUpdate)
{
downloadAndSyncCapacityPools();
lastCapacityUpdateT.setToNow();
}
if(lastMetaCacheSweepT.elapsedMS() > currentCacheSweepMS)
{
bool flushTriggered = app->getMetaStore()->cacheSweepAsync();
currentCacheSweepMS = (flushTriggered ? metaCacheSweepStressedMS : metaCacheSweepNormalMS);
lastMetaCacheSweepT.setToNow();
}
if(lastIdleDisconnectT.elapsedMS() > idleDisconnectIntervalMS)
{
dropIdleConns();
lastIdleDisconnectT.setToNow();
}
if(lastTimeIDSet.elapsedMS() > updateIDTimeMS)
{
StorageTk::resetIDCounterToNow();
lastTimeIDSet.setToNow();
}
if(doTargetStatesUpdate)
{
if (this->offlineWait.hasTimeout() )
{
// if we're waiting to be offlined, set our local state to needs-resync and don't report
// anything to the mgmtd
setNodeConsistencyState(TargetConsistencyState_NEEDS_RESYNC);
}
else
{
TargetConsistencyState newConsistencyState;
updateMetaStatesAndBuddyGroups(newConsistencyState, true);
setNodeConsistencyState(newConsistencyState);
downloadAndSyncTargetStatesAndBuddyGroups();
}
lastTargetStatesUpdateT.setToNow();
}
if (doPublishCapacities)
{
publishNodeCapacity();
lastCapacityPublishedT.setToNow();
}
}
}
// fhgfs_meta\source\components\InternodeSyncer.cpp
/**
* Download and sync metadata server target states and mirror buddy groups.
*
* @param outConsistencyState The new node consistency state.
*/
bool InternodeSyncer::updateMetaStatesAndBuddyGroups(TargetConsistencyState& outConsistencyState,
bool publish)
{
LOG_TOP(STATESYNC, DEBUG, "Starting state update.");
App* app = Program::getApp();
NodeStore* mgmtNodes = app->getMgmtNodes();
TargetStateStore* metaStateStore = app->getMetaStateStore();
MirrorBuddyGroupMapper* buddyGroupMapper = app->getMetaBuddyGroupMapper();
static bool downloadFailedLogged = false; // to avoid log spamming
static bool publishFailedLogged = false;
NumNodeID localNodeID = app->getLocalNodeNumID();
auto node = mgmtNodes->referenceFirstNode();
if(!node)
{
LOG_TOP(STATESYNC, ERR, "Management node not defined.");
return false;
}
UInt16List buddyGroupIDs;
UInt16List primaryNodeIDs;
UInt16List secondaryNodeIDs;
UInt16List nodeIDs; // this should actually be targetIDs, but MDS doesn't have targets yet
UInt8List reachabilityStates;
UInt8List consistencyStates;
unsigned numRetries = 10; // If publishing states fails 10 times, give up (-> POFFLINE).
// Note: Publishing fails if between downloadStatesAndBuddyGroups and
// publishLocalTargetStateChanges, a state on the mgmtd is changed (e.g. because the primary
// sets NEEDS_RESYNC for the secondary). In that case, we will retry.
LOG_TOP(STATESYNC, DEBUG, "Beginning target state update...");
bool publishSuccess = false;
while (!publishSuccess && (numRetries--) )
{
// In case we're already retrying, clear out leftover data from the lists.
buddyGroupIDs.clear();
primaryNodeIDs.clear();
secondaryNodeIDs.clear();
nodeIDs.clear();
reachabilityStates.clear();
consistencyStates.clear();
bool downloadRes = NodesTk::downloadStatesAndBuddyGroups(*node, NODETYPE_Meta, &buddyGroupIDs,
&primaryNodeIDs, &secondaryNodeIDs, &nodeIDs, &reachabilityStates, &consistencyStates,
true);
if (!downloadRes)
{
if (!downloadFailedLogged)
{
LOG_TOP(STATESYNC, WARNING,
"Downloading target states from management node failed. "
"Setting all target states to probably-offline.");
downloadFailedLogged = true;
}
metaStateStore->setAllStates(TargetReachabilityState_POFFLINE);
break;
}
downloadFailedLogged = false;
UInt8List oldConsistencyStates = consistencyStates;
// Sync buddy groups here, because decideResync depends on it.
metaStateStore->syncStatesAndGroupsFromLists(buddyGroupMapper, nodeIDs, reachabilityStates,
consistencyStates, buddyGroupIDs, primaryNodeIDs, secondaryNodeIDs, localNodeID);
CombinedTargetState newStateFromMgmtd;
// Find local state which was sent by mgmtd
for (ZipIterRange<UInt16List, UInt8List, UInt8List>
statesFromMgmtdIter(nodeIDs, reachabilityStates, consistencyStates);
!statesFromMgmtdIter.empty(); ++statesFromMgmtdIter)
{
if (*(statesFromMgmtdIter()->first) == localNodeID.val())
{
newStateFromMgmtd = CombinedTargetState(
TargetReachabilityState(*(statesFromMgmtdIter()->second) ),
TargetConsistencyState(*(statesFromMgmtdIter()->third) ) );
}
}
TargetConsistencyState localChangedState = decideResync(newStateFromMgmtd);
if (!publish)
{
outConsistencyState = localChangedState;
metaStateStore->setState(localNodeID.val(),
CombinedTargetState(TargetReachabilityState_ONLINE, localChangedState) );
return true;
}
// Note: In this case "old" means "before we changed it locally".
TargetConsistencyState oldState = newStateFromMgmtd.consistencyState;
publishSuccess = publishNodeStateChange(oldState, localChangedState);
if (publishSuccess)
{
outConsistencyState = localChangedState;
metaStateStore->setState(localNodeID.val(),
CombinedTargetState(TargetReachabilityState_ONLINE, localChangedState) );
BuddyCommTk::checkBuddyNeedsResync();
}
}
if (!publishSuccess)
{
if (!publishFailedLogged)
{
LOG_TOP(STATESYNC, WARNING, "Pushing local state to management node failed.");
publishFailedLogged = true;
}
}
else
publishFailedLogged = false;
return true;
}
只有主节点才会做同步检查。
// fhgfs_meta\source\toolkit\BuddyCommTk.cpp
namespace BuddyCommTk
{
static const std::string BUDDY_NEEDS_RESYNC_FILENAME = "buddyneedsresync";
void checkBuddyNeedsResync()
{
App* app = Program::getApp();
MirrorBuddyGroupMapper* metaBuddyGroups = app->getMetaBuddyGroupMapper();
TargetStateStore* metaNodeStates = app->getMetaStateStore();
InternodeSyncer* internodeSyncer = app->getInternodeSyncer();
BuddyResyncer* buddyResyncer = app->getBuddyResyncer();
NumNodeID localID = app->getLocalNodeNumID();
bool isPrimary;
NumNodeID buddyID = NumNodeID(metaBuddyGroups->getBuddyTargetID(localID.val(), &isPrimary) );
if (isPrimary) // Only do the check if we are the primary.
{
const bool buddyNeedsResyncFileExists = getBuddyNeedsResync();
if (buddyNeedsResyncFileExists)
{
LOG_DEBUG(__func__, Log_NOTICE, "buddyneedsresync file found.");
CombinedTargetState state = CombinedTargetState(TargetReachabilityState_ONLINE,
TargetConsistencyState_NEEDS_RESYNC);
metaNodeStates->getState(buddyID.val(), state);
// Only send message if buddy was still reported as GOOD before (otherwise the mgmtd
// already knows it needs a resync, or it's BAD and shouldn't be resynced anyway).
if (state.consistencyState == TargetConsistencyState_GOOD)
{
setBuddyNeedsResyncState(true);
LogContext(__func__).log(Log_NOTICE, "Set needs-resync state for buddy node.");
}
}
// check if the secondary is set to needs-resync by the mgmtd.
TargetConsistencyState consistencyState = internodeSyncer->getNodeConsistencyState();
// If our own state is not good, don't start resync (wait until InternodeSyncer sets us
// good again).
if (consistencyState != TargetConsistencyState_GOOD)
{
LOG_DEBUG(__func__, Log_DEBUG,
"Local node state is not good, won't check buddy state.");
return;
}
CombinedTargetState buddyState;
if (!metaNodeStates->getState(buddyID.val(), buddyState) )
{
LOG_DEBUG(__func__, Log_DEBUG, "Buddy state is invalid for node ID "
+ buddyID.str() + ".");
return;
}
if (buddyState == CombinedTargetState(TargetReachabilityState_ONLINE,
TargetConsistencyState_NEEDS_RESYNC) )
{
FhgfsOpsErr resyncRes = buddyResyncer->startResync();
if (resyncRes == FhgfsOpsErr_SUCCESS)
{
LOG(WARNING, "Starting buddy resync job.", as("Buddy node ID", buddyID.val()));
}
else if (resyncRes == FhgfsOpsErr_INUSE)
{
LOG(WARNING, "Resync job currently running.", as("Buddy node ID", buddyID.val()));
}
else
{
LOG(WARNING, "Starting buddy resync job failed.", as("Buddy node ID", buddyID.val()));
}
}
}
}
}
// fhgfs_meta\source\components\buddyresyncer\BuddyResyncer.cpp
FhgfsOpsErr BuddyResyncer::startResync()
{
std::lock_guard<Mutex> lock(jobMutex);
if (noNewResyncs)
return FhgfsOpsErr_INTERRUPTED;
if (!job)
{
job = new BuddyResyncJob();
job->start();
return FhgfsOpsErr_SUCCESS;
}
switch (job->getState())
{
case BuddyResyncJobState_NOTSTARTED:
case BuddyResyncJobState_RUNNING:
return FhgfsOpsErr_INUSE;
default:
// a job must never be started more than once. to ensure this, we create a new job for
// every resync process, because doing so allows us to use NOTSTARTED and RUNNING as
// "job is currently active" values. otherwise, a second resync may see state SUCCESS and
// allow duplicate resyncer activity.
// if a job is still active, don't wait for very long - it may take a while to finish. the
// internode syncer will retry periodically, so this will work fine.
if (!job->timedjoin(10))
return FhgfsOpsErr_INUSE;
delete job;
job = new BuddyResyncJob();
job->start();
return FhgfsOpsErr_SUCCESS;
}
}
// fhgfs_meta\source\components\buddyresyncer\BuddyResyncJob.h
class BuddyResyncJob : public PThread
{
public:
BuddyResyncJob();
~BuddyResyncJob();
virtual void run();
void abort();
MetaBuddyResyncJobStatistics getJobStats();
private:
BuddyResyncJobState state;
Mutex stateMutex;
int64_t startTime;
int64_t endTime;
NumNodeID buddyNodeID;
MetaSyncCandidateStore syncCandidates;
std::unique_ptr<BuddyResyncerGatherSlave> gatherSlave;
std::vector<std::unique_ptr<BuddyResyncerBulkSyncSlave>> bulkSyncSlaves;
std::unique_ptr<BuddyResyncerModSyncSlave> modSyncSlave;
std::unique_ptr<SessionStoreResyncer> sessionStoreResyncer;
bool startGatherSlaves();
bool startSyncSlaves();
void joinGatherSlaves();
public:
BuddyResyncJobState getState()
{
std::lock_guard<Mutex> lock(stateMutex);
return state;
}
bool isRunning()
{
std::lock_guard<Mutex> lock(stateMutex);
return state == BuddyResyncJobState_RUNNING;
}
void enqueue(MetaSyncCandidateFile syncCandidate, PThread* caller)
{
syncCandidates.add(std::move(syncCandidate), caller);
}
private:
void setState(const BuddyResyncJobState state)
{
LOG_DEBUG(__func__, Log_DEBUG, "Setting state: "
+ StringTk::uintToStr(static_cast<int>(state) ) );
std::lock_guard<Mutex> lock(stateMutex);
this->state = state;
}
TargetConsistencyState newBuddyState();
void informBuddy(const TargetConsistencyState newTargetState);
void informMgmtd(const TargetConsistencyState newTargetState);
void stopAllWorkersOn(Barrier& barrier);
};
// fhgfs_meta\source\components\buddyresyncer\BuddyResyncJob.cpp
BuddyResyncJob::BuddyResyncJob() :
PThread("BuddyResyncJob"),
state(BuddyResyncJobState_NOTSTARTED),
startTime(0), endTime(0),
gatherSlave(boost::make_unique<BuddyResyncerGatherSlave>(&syncCandidates))
{
App* app = Program::getApp();
Config* cfg = app->getConfig();
buddyNodeID =
NumNodeID(app->getMetaBuddyGroupMapper()->getBuddyTargetID(app->getLocalNodeNumID().val()));
const unsigned numSyncSlaves = std::max<unsigned>(cfg->getTuneNumResyncSlaves(), 1);
for (size_t i = 0; i < numSyncSlaves; i++)
bulkSyncSlaves.emplace_back(
boost::make_unique<BuddyResyncerBulkSyncSlave>(*this, &syncCandidates, i, buddyNodeID));
sessionStoreResyncer = boost::make_unique<SessionStoreResyncer>(buddyNodeID);
modSyncSlave = boost::make_unique<BuddyResyncerModSyncSlave>(*this, &syncCandidates, 1, buddyNodeID);
}
void BuddyResyncJob::run()
{
const char* logContext = "Run resync job";
InternodeSyncer* internodeSyncer = Program::getApp()->getInternodeSyncer();
App* app = Program::getApp();
WorkerList* workers = app->getWorkers();
NodeStore* metaNodes = app->getMetaNodes();
const std::string metaPath = app->getMetaPath();
const std::string metaBuddyMirPath = app->getMetaPath() + "/" + CONFIG_BUDDYMIRROR_SUBDIR_NAME;
Barrier workerBarrier(workers->size() + 1);
bool workersStopped = false;
startTime = time(NULL);
syncCandidates.clear();
char* respBuf = NULL;
NetMessage* respMsg = NULL;
auto buddyNode = metaNodes->referenceNode(buddyNodeID);
if (!buddyNode)
{
LOG(ERR, "Unable to resolve buddy node. Resync will not start.");
setState(BuddyResyncJobState_FAILURE);
goto cleanup;
}
DEBUG_ENV_VAR(unsigned, DIE_AT_RESYNC_N, 0, "BEEGFS_RESYNC_DIE_AT_N");
if (DIE_AT_RESYNC_N) {
static unsigned resyncs = 0;
// for #479: terminating a server at this point caused the workers to terminate before the
// resyncer had communicated with them, causing a deadlock on shutdown
if (++resyncs == DIE_AT_RESYNC_N) {
::kill(0, SIGTERM);
sleep(4);
}
}
stopAllWorkersOn(workerBarrier);
{
// Notify buddy that resync started and wait for confirmation
StorageResyncStartedMsg msg(buddyNodeID.val());
const bool commRes = MessagingTk::requestResponse(*buddyNode, &msg,
NETMSGTYPE_StorageResyncStartedResp, &respBuf, &respMsg);
if (!commRes)
{
LogContext(logContext).logErr("Unable to notify buddy about resync attempt. "
"Resync will not start.");
setState(BuddyResyncJobState_FAILURE);
workerBarrier.wait();
goto cleanup;
}
SAFE_DELETE(respMsg);
SAFE_DELETE(respBuf);
// resync could have been aborted before we got here. if so, exit as soon as possible without
// setting the resync job state to something else.
{
std::unique_lock<Mutex> lock(stateMutex);
if (state == BuddyResyncJobState_INTERRUPTED)
{
lock.unlock();
workerBarrier.wait();
goto cleanup;
}
state = BuddyResyncJobState_RUNNING;
}
internodeSyncer->setResyncInProgress(true);
const bool startGatherSlaveRes = startGatherSlaves();
if (!startGatherSlaveRes)
{
setState(BuddyResyncJobState_FAILURE);
workerBarrier.wait();
goto cleanup;
}
const bool startResyncSlaveRes = startSyncSlaves();
if (!startResyncSlaveRes)
{
setState(BuddyResyncJobState_FAILURE);
workerBarrier.wait();
goto cleanup;
}
}
workerBarrier.wait();
LOG_DEBUG(__func__, Log_DEBUG, "Going to join gather slaves.");
joinGatherSlaves();
LOG_DEBUG(__func__, Log_DEBUG, "Joined gather slaves.");
LOG_DEBUG(__func__, Log_DEBUG, "Going to join sync slaves.");
// gather slaves have finished. Tell sync slaves to stop when work packages are empty and wait.
for (auto it = bulkSyncSlaves.begin(); it != bulkSyncSlaves.end(); ++it)
{
(*it)->setOnlyTerminateIfIdle(true);
(*it)->selfTerminate();
}
for (auto it = bulkSyncSlaves.begin(); it != bulkSyncSlaves.end(); ++it)
(*it)->join();
// here we can be in one of two situations:
// 1. bulk resync has succeeded. we then totally stop the workers: the session store must be in
// a quiescent state for resync, so for simplicitly, we suspend all client operations here.
// we do not want to do this any earlier than this point, because bulk syncers may take a
// very long time to complete.
// 2. bulk resync has failed. in this case, the bulk syncers have aborted the currently running
// job, and the mod syncer is either dead or in the process of dying. here we MUST NOT stop
// the workers, because they are very likely blocked on the mod sync queue already and will
// not unblock before the queue is cleared.
if (getState() == BuddyResyncJobState_RUNNING)
{
stopAllWorkersOn(workerBarrier);
workersStopped = true;
}
modSyncSlave->setOnlyTerminateIfIdle(true);
modSyncSlave->selfTerminate();
modSyncSlave->join();
// gatherers are done and the workers have been stopped, we can safely resync the session now.
LOG_DEBUG(__func__, Log_DEBUG, "Joined sync slaves.");
// Perform session store resync
// the job may have been aborted or terminated by errors. in this case, do not resync the session
// store. end the sync as quickly as possible.
if (getState() == BuddyResyncJobState_RUNNING)
sessionStoreResyncer->doSync();
// session store is now synced, and future actions can be forwarded safely. we do not restart
// the workers here because the resync may still enter FAILED state, and we don't want to forward
// to the secondary in this case.
cleanup:
bool syncErrors = false;
{
std::lock_guard<Mutex> lock(gatherSlave->stateMutex);
while (gatherSlave->isRunning)
gatherSlave->isRunningChangeCond.wait(&gatherSlave->stateMutex);
syncErrors |= gatherSlave->getStats().errors != 0;
}
for (auto it = bulkSyncSlaves.begin(); it != bulkSyncSlaves.end(); ++it)
{
BuddyResyncerBulkSyncSlave* slave = it->get();
std::lock_guard<Mutex> lock(slave->stateMutex);
while (slave->isRunning)
slave->isRunningChangeCond.wait(&slave->stateMutex);
syncErrors |= slave->getStats().dirErrors != 0;
syncErrors |= slave->getStats().fileErrors != 0;
}
syncErrors |= sessionStoreResyncer->getStats().errors;
{
while (modSyncSlave->isRunning)
modSyncSlave->isRunningChangeCond.wait(&modSyncSlave->stateMutex);
syncErrors |= modSyncSlave->getStats().errors != 0;
}
if (getState() == BuddyResyncJobState_RUNNING || getState() == BuddyResyncJobState_INTERRUPTED)
{
if (syncErrors)
setState(BuddyResyncJobState_ERRORS);
else if (getState() == BuddyResyncJobState_RUNNING)
setState(BuddyResyncJobState_SUCCESS);
// delete timestamp override file if it exists.
BuddyCommTk::setBuddyNeedsResync(metaPath, false, buddyNodeID);
const TargetConsistencyState buddyState = newBuddyState();
informBuddy(buddyState);
informMgmtd(buddyState);
const bool interrupted = getState() != BuddyResyncJobState_SUCCESS;
LOG(WARNING, "Resync finished.", interrupted, syncErrors);
}
internodeSyncer->setResyncInProgress(false);
endTime = time(NULL);
// restart all the worker threads
if (workersStopped)
workerBarrier.wait();
// if the resync was aborted, the mod sync queue may still contain items. additionally, workers
// may be waiting for a changeset slot, or they may have started executing after the resync was
// aborted by the sync slaves, but before the resync was officially set to "not running".
// we cannot set the resync to "not running" in abort() because we have no upper bound for the
// number of worker threads. even if we did set the resync to "not running" in abort() and
// cleared the sync queues at the same time, there may still be an arbitrary number of threads
// waiting for a changeset slot.
// instead, we have to wait for each thread to "see" that the resync is over, and periodically
// clear the sync queue to unblock those workers that are still waiting for slots.
if (syncErrors)
{
SynchronizedCounter counter;
for (auto it = workers->begin(); it != workers->end(); ++it)
{
auto& worker = **it;
worker.getWorkQueue()->addPersonalWork(
new IncSyncedCounterWork(&counter),
worker.getPersonalWorkQueue());
}
while (!counter.timedWaitForCount(workers->size(), 100))
{
while (!syncCandidates.isFilesEmpty())
{
MetaSyncCandidateFile candidate;
syncCandidates.fetch(candidate, this);
candidate.signal();
}
}
}
}
// fhgfs_storage\source\components\buddyresyncer\BuddyResyncerGatherSlave.h
class BuddyResyncerGatherSlave : public PThread
{
friend class BuddyResyncer; // (to grant access to internal mutex)
friend class BuddyResyncJob; // (to grant access to internal mutex)
public:
BuddyResyncerGatherSlave(uint16_t targetID, ChunkSyncCandidateStore* syncCandidates,
BuddyResyncerGatherSlaveWorkQueue* workQueue, uint8_t slaveID);
virtual ~BuddyResyncerGatherSlave();
void workLoop();
private:
AtomicSizeT onlyTerminateIfIdle; // atomic quasi-bool
Mutex statusMutex; // protects isRunning
Condition isRunningChangeCond;
uint16_t targetID;
AtomicUInt64 numChunksDiscovered;
AtomicUInt64 numChunksMatched;
AtomicUInt64 numDirsDiscovered;
AtomicUInt64 numDirsMatched;
bool isRunning; // true if an instance of this component is currently running
ChunkSyncCandidateStore* syncCandidates;
BuddyResyncerGatherSlaveWorkQueue* workQueue;
// nftw() callback needs access the slave threads
static Mutex staticGatherSlavesMutex;
static std::map<std::string, BuddyResyncerGatherSlave*> staticGatherSlaves;
virtual void run();
static int handleDiscoveredEntry(const char* path, const struct stat* statBuf,
int ftwEntryType, struct FTW* ftwBuf);
public:
// getters & setters
bool getIsRunning()
{
SafeMutexLock safeLock(&statusMutex);
bool retVal = this->isRunning;
safeLock.unlock();
return retVal;
}
uint16_t getTargetID()
{
return targetID;
}
void getCounters(uint64_t& outNumChunksDiscovered, uint64_t& outNumChunksMatched,
uint64_t& outNumDirsDiscovered, uint64_t& outNumDirsMatched)
{
outNumChunksDiscovered = numChunksDiscovered.read();
outNumChunksMatched = numChunksMatched.read();
outNumDirsDiscovered = numDirsDiscovered.read();
outNumDirsMatched = numDirsMatched.read();
}
void setOnlyTerminateIfIdle(bool value)
{
if (value)
onlyTerminateIfIdle.set(1);
else
onlyTerminateIfIdle.setZero();
}
bool getOnlyTerminateIfIdle()
{
if (onlyTerminateIfIdle.read() == 0)
return false;
else
return true;
}
private:
// getters & setters
void setIsRunning(bool isRunning)
{
SafeMutexLock safeLock(&statusMutex);
this->isRunning = isRunning;
isRunningChangeCond.broadcast();
safeLock.unlock();
}
bool getSelfTerminateNotIdle()
{
return ( (getSelfTerminate() && (!getOnlyTerminateIfIdle())) );
}
};
// sfhgfs_meta\source\components\buddyresyncer\BuddyResyncerGatherSlave.cpp
BuddyResyncerGatherSlave::BuddyResyncerGatherSlave(MetaSyncCandidateStore* syncCandidates) :
PThread("BuddyResyncerGatherSlave"),
isRunning(false),
syncCandidates(syncCandidates)
{
metaBuddyPath = Program::getApp()->getMetaPath() + "/" CONFIG_BUDDYMIRROR_SUBDIR_NAME;
}
// fhgfs_meta\source\components\buddyresyncer\BuddyResyncJob.cpp
bool BuddyResyncJob::startGatherSlaves()
{
try
{
gatherSlave->resetSelfTerminate();
gatherSlave->start();
gatherSlave->setIsRunning(true);
}
catch (PThreadCreateException& e)
{
LogContext(__func__).logErr(std::string("Unable to start thread: ") + e.what());
return false;
}
return true;
}
// fhgfs_storage\source\components\buddyresyncer\BuddyResyncerGatherSlave.cpp
void BuddyResyncerGatherSlave::run()
{
setIsRunning(true);
try
{
LOG(DEBUG, "Component started");
registerSignalHandler();
workLoop();
LOG(DEBUG, "Component stopped");
}
catch (std::exception& e)
{
PThread::getCurrentThreadApp()->handleComponentException(e);
}
setIsRunning(false);
}
void BuddyResyncerGatherSlave::workLoop()
{
crawlDir(metaBuddyPath + "/" META_INODES_SUBDIR_NAME, MetaSyncDirType::InodesHashDir);
crawlDir(metaBuddyPath + "/" META_DENTRIES_SUBDIR_NAME, MetaSyncDirType::DentriesHashDir);
}
void BuddyResyncerGatherSlave::crawlDir(const std::string& path, const MetaSyncDirType type,
const unsigned level)
{
LOG_DBG(DEBUG, "Entering hash dir.", level, path);
std::unique_ptr<DIR, StorageTk::CloseDirDeleter> dirHandle(::opendir(path.c_str()));
if (!dirHandle)
{
LOG(ERR, "Unable to open path", path, sysErr());
numErrors.increase();
return;
}
while (!getSelfTerminate())
{
struct dirent* entry;
#if USE_READDIR_R
struct dirent buffer;
const int readRes = ::readdir_r(dirHandle.get(), &buffer, &entry);
#else
errno = 0;
entry = ::readdir(dirHandle.get());
const int readRes = entry ? 0 : errno;
#endif
if (readRes != 0)
{
LOG(ERR, "Could not read dir entry.", path, sysErr(readRes));
numErrors.increase();
return;
}
if (!entry)
break;
if (strcmp(entry->d_name, ".") == 0 || strcmp(entry->d_name, "..") == 0)
continue;
const std::string& candidatePath = path + "/" + entry->d_name;
struct stat statBuf;
const int statRes = ::stat(candidatePath.c_str(), &statBuf);
if (statRes)
{
// in a 2nd level dentry hashdir, content directories may disappear - this is not an error,
// it was most likely caused by an rmdir issued by a user.
if (!(errno == ENOENT && type == MetaSyncDirType::DentriesHashDir && level == 2))
{
LOG(ERR, "Could not stat dir entry.", candidatePath, sysErr());
numErrors.increase();
}
continue;
}
if (!S_ISDIR(statBuf.st_mode))
{
LOG(ERR, "Found a non-dir where only directories are expected.", candidatePath,
oct(statBuf.st_mode));
numErrors.increase();
continue;
}
// layout is: (dentries|inodes)/l1/l2/...
// -> level 0 correlates with type
// -> level 1 is not very interesting, except for reporting
// -> level 2 must be synced. if it is a dentry hashdir, its contents must also be crawled.
if (level == 0)
{
crawlDir(candidatePath, type, level + 1);
continue;
}
if (level == 1)
{
LOG_DBG(DEBUG, "Adding hashdir sync candidate.", candidatePath);
addCandidate(candidatePath, type);
if (type == MetaSyncDirType::DentriesHashDir)
crawlDir(candidatePath, type, level + 1);
continue;
}
// so here we read a 2nd level dentry hashdir. crawl that too, add sync candidates for each
// entry we find - non-directories have already been reported, and the bulk resyncer will
// take care of the fsids directories.
numDirsDiscovered.increase();
LOG_DBG(DEBUG, "Adding contdir sync candidate.", candidatePath);
addCandidate(candidatePath, MetaSyncDirType::ContentDir);
}
}
SyncCandidateStore里面有两个std::list队列MetaSyncCandidateDir和MetaSyncCandidateFile,分别用于主备不一致时Resync和元数据改动时实时同步。
// fhgfs_meta\source\components\buddyresyncer\SyncCandidate.h
typedef SyncCandidateStore<MetaSyncCandidateDir, MetaSyncCandidateFile> MetaSyncCandidateStore;
// fhgfs_common\source\common\storage\mirroring\SyncCandidateStore.h
template <typename SyncCandidateDir, typename SyncCandidateFile>
class SyncCandidateStore
{
public:
SyncCandidateStore()
: numQueuedFiles(0), numQueuedDirs(0)
{ }
private:
typedef std::list<SyncCandidateFile> CandidateFileList;
CandidateFileList candidatesFile;
Mutex candidatesFileMutex;
Condition filesAddedCond;
Condition filesFetchedCond;
typedef std::list<SyncCandidateDir> CandidateDirList;
CandidateDirList candidatesDir;
Mutex candidatesDirMutex;
Condition dirsAddedCond;
Condition dirsFetchedCond;
// mainly used to avoid constant calling of size() method of lists
unsigned numQueuedFiles;
unsigned numQueuedDirs;
static const unsigned MAX_QUEUE_SIZE = 50000;
public:
void add(SyncCandidateFile entry, PThread* caller)
{
static const unsigned waitTimeoutMS = 1000;
std::lock_guard<Mutex> mutexLock(candidatesFileMutex);
// wait if list is too big
while (numQueuedFiles > MAX_QUEUE_SIZE)
{
if (caller && unlikely(caller->getSelfTerminate() ) )
break; // ignore limit if selfTerminate was set to avoid hanging on shutdown
filesFetchedCond.timedwait(&candidatesFileMutex, waitTimeoutMS);
}
this->candidatesFile.push_back(std::move(entry));
numQueuedFiles++;
filesAddedCond.signal();
}
void fetch(SyncCandidateFile& outCandidate, PThread* caller)
{
static const unsigned waitTimeMS = 3000;
std::lock_guard<Mutex> mutexLock(candidatesFileMutex);
while (candidatesFile.empty() )
{
if(caller && unlikely(caller->getSelfTerminate() ) )
{
outCandidate = SyncCandidateFile();
return;
}
filesAddedCond.timedwait(&candidatesFileMutex, waitTimeMS);
}
outCandidate = std::move(candidatesFile.front());
candidatesFile.pop_front();
numQueuedFiles--;
filesFetchedCond.signal();
}
void add(SyncCandidateDir entry, PThread* caller)
{
static const unsigned waitTimeoutMS = 3000;
std::lock_guard<Mutex> mutexLock(candidatesDirMutex);
// wait if list is too big
while (numQueuedDirs > MAX_QUEUE_SIZE)
{
if (caller && unlikely(caller->getSelfTerminate() ) )
break; // ignore limit if selfTerminate was set to avoid hanging on shutdown
dirsFetchedCond.timedwait(&candidatesDirMutex, waitTimeoutMS);
}
this->candidatesDir.push_back(std::move(entry));
numQueuedDirs++;
dirsAddedCond.signal();
}
bool waitForFiles(PThread* caller)
{
static const unsigned waitTimeoutMS = 3000;
std::lock_guard<Mutex> mutexLock(candidatesFileMutex);
while (numQueuedFiles == 0)
{
if (caller && caller->getSelfTerminate())
return false;
filesAddedCond.timedwait(&candidatesFileMutex, waitTimeoutMS);
}
return true;
}
void fetch(SyncCandidateDir& outCandidate, PThread* caller)
{
static const unsigned waitTimeMS = 3000;
std::lock_guard<Mutex> mutexLock(candidatesDirMutex);
while (candidatesDir.empty() )
{
if(caller && unlikely(caller->getSelfTerminate() ) )
{
outCandidate = SyncCandidateDir();
return;
}
dirsAddedCond.timedwait(&candidatesDirMutex, waitTimeMS);
}
outCandidate = std::move(candidatesDir.front());
candidatesDir.pop_front();
numQueuedDirs--;
dirsFetchedCond.signal();
}
bool isFilesEmpty()
{
std::lock_guard<Mutex> mutexLock(candidatesFileMutex);
return candidatesFile.empty();
}
bool isDirsEmpty()
{
std::lock_guard<Mutex> mutexLock(candidatesDirMutex);
return candidatesDir.empty();
}
void waitForFiles(const unsigned timeoutMS)
{
std::lock_guard<Mutex> mutexLock(candidatesFileMutex);
if (candidatesFile.empty() )
{
if (timeoutMS == 0)
filesAddedCond.wait(&candidatesFileMutex);
else
filesAddedCond.timedwait(&candidatesFileMutex, timeoutMS);
}
}
void waitForDirs(const unsigned timeoutMS)
{
std::lock_guard<Mutex> mutexLock(candidatesDirMutex);
if (candidatesDir.empty() )
{
if (timeoutMS == 0)
dirsAddedCond.wait(&candidatesDirMutex);
else
dirsAddedCond.timedwait(&candidatesDirMutex, timeoutMS);
}
}
size_t getNumFiles()
{
std::lock_guard<Mutex> mutexLock(candidatesFileMutex);
return candidatesFile.size();
}
size_t getNumDirs()
{
std::lock_guard<Mutex> mutexLock(candidatesDirMutex);
return candidatesDir.size();
}
void clear()
{
{
std::lock_guard<Mutex> dirMutexLock(candidatesDirMutex);
candidatesDir.clear();
numQueuedDirs = 0;
}
{
std::lock_guard<Mutex> fileMutexLock(candidatesFileMutex);
candidatesFile.clear();
numQueuedFiles = 0;
}
}
};
// fhgfs_meta\source\components\buddyresyncer\BuddyResyncerBulkSyncSlave.h
class BuddyResyncerModSyncSlave : public SyncSlaveBase
{
friend class BuddyResyncer;
friend class BuddyResyncJob;
public:
BuddyResyncerModSyncSlave(BuddyResyncJob& parentJob, MetaSyncCandidateStore* syncCanditates,
uint8_t slaveID, const NumNodeID& buddyNodeID);
struct Stats
{
uint64_t objectsSynced;
uint64_t errors;
};
Stats getStats()
{
return Stats{ numObjectsSynced.read(), numErrors.read() };
}
private:
MetaSyncCandidateStore* syncCandidates;
AtomicUInt64 numObjectsSynced;
AtomicUInt64 numErrors;
void syncLoop();
FhgfsOpsErr streamCandidates(Socket& socket);
private:
static FhgfsOpsErr streamCandidates(Socket* socket, void* context)
{
return static_cast<BuddyResyncerModSyncSlave*>(context)->streamCandidates(*socket);
}
};
// fhgfs_meta\source\components\buddyresyncer\BuddyResyncJob.cpp
bool BuddyResyncJob::startSyncSlaves()
{
App* app = Program::getApp();
const NumNodeID localNodeID = app->getLocalNodeNumID();
const NumNodeID buddyNodeID(
app->getMetaBuddyGroupMapper()->getBuddyTargetID(localNodeID.val(), NULL) );
for (size_t i = 0; i < bulkSyncSlaves.size(); i++)
{
try
{
bulkSyncSlaves[i]->resetSelfTerminate();
bulkSyncSlaves[i]->start();
bulkSyncSlaves[i]->setIsRunning(true);
}
catch (PThreadCreateException& e)
{
LogContext(__func__).logErr(std::string("Unable to start thread: ") + e.what() );
for (size_t j = 0; j < i; j++)
bulkSyncSlaves[j]->selfTerminate();
return false;
}
}
try
{
modSyncSlave->resetSelfTerminate();
modSyncSlave->start();
modSyncSlave->setIsRunning(true);
}
catch (PThreadCreateException& e)
{
LogContext(__func__).logErr(std::string("Unable to start thread: ") + e.what() );
for (size_t j = 0; j < bulkSyncSlaves.size(); j++)
bulkSyncSlaves[j]->selfTerminate();
return false;
}
return true;
}
// fhgfs_meta\source\components\buddyresyncer\BuddyResyncerBulkSyncSlave.cpp
void BuddyResyncerBulkSyncSlave::syncLoop()
{
EntryLockStore* const lockStore = Program::getApp()->getMirroredSessions()->getEntryLockStore();
while (!getSelfTerminateNotIdle())
{
MetaSyncCandidateDir candidate;
syncCandidates->fetch(candidate, this);
// the sync candidate we have retrieved may be invalid if this thread was ordered to
// terminate and the sync candidate store has no more directories queued for us.
// in this case, we may end the sync because we have no more candidates, and the resync job
// guarantees that all gather threads have completed before the bulk syncers are ordered to
// finish.
if (syncCandidates->isDirsEmpty() && candidate.getRelativePath().empty() &&
getSelfTerminate())
return;
if (candidate.getType() == MetaSyncDirType::InodesHashDir ||
candidate.getType() == MetaSyncDirType::DentriesHashDir)
{
// lock the hash path in accordance with MkLocalDir, RmLocalDir and RmDir.
const auto& hashDir = candidate.getRelativePath();
auto slash1 = hashDir.find('/');
auto slash2 = hashDir.find('/', slash1 + 1);
auto hash1 = StringTk::strHexToUInt(hashDir.substr(slash1 + 1, slash2 - slash1 - 1));
auto hash2 = StringTk::strHexToUInt(hashDir.substr(slash2 + 1));
HashDirLock hashLock = {lockStore, {hash1, hash2}};
const FhgfsOpsErr resyncRes = resyncDirectory(candidate, "");
if (resyncRes == FhgfsOpsErr_SUCCESS)
continue;
numDirErrors.increase();
parentJob->abort();
return;
}
// not a hash dir, so it must be a content directory. sync the #fSiDs# first, then the actual
// content directory. we lock the directory inode the content directory belongs to because we
// must not allow a concurrent meta action to delete the content directory while we are
// resyncing it. concurrent modification of directory contents could be allowed, though.
const std::string dirInodeID = Path(candidate.getRelativePath()).back();
const std::string fullPath = META_BUDDYMIRROR_SUBDIR_NAME "/" + candidate.getRelativePath();
DirIDLock dirLock(lockStore, dirInodeID, false);
// first ensure that the directory still exists - a concurrent modification may have deleted
// it. this would not be an error; bulk resync should not touch it, an modification sync
// would remove it completely.
if (::access(fullPath.c_str(), F_OK) != 0 && errno == ENOENT)
{
numDirsSynced.increase(); // Count it anyway, so the sums match up.
continue;
}
MetaSyncCandidateDir fsIDs(
candidate.getRelativePath() + "/" + META_DIRENTRYID_SUB_STR,
MetaSyncDirType::InodesHashDir);
FhgfsOpsErr resyncRes = resyncDirectory(fsIDs, dirInodeID);
if (resyncRes == FhgfsOpsErr_SUCCESS)
resyncRes = resyncDirectory(candidate, dirInodeID);
if (resyncRes != FhgfsOpsErr_SUCCESS)
{
numDirErrors.increase();
parentJob->abort();
return;
}
else
{
numDirsSynced.increase();
}
}
}
FhgfsOpsErr BuddyResyncerBulkSyncSlave::resyncDirectory(const MetaSyncCandidateDir& root,
const std::string& inodeID)
{
StreamCandidateArgs args(*this, root, inodeID);
return resyncAt(Path(root.getRelativePath()), true, streamCandidateDir, &args);
}
FhgfsOpsErr BuddyResyncerBulkSyncSlave::streamCandidateDir(Socket& socket,
const MetaSyncCandidateDir& candidate, const std::string& inodeID)
{
EntryLockStore* const lockStore = Program::getApp()->getMirroredSessions()->getEntryLockStore();
Path candidatePath(META_BUDDYMIRROR_SUBDIR_NAME "/" + candidate.getRelativePath());
std::unique_ptr<DIR, StorageTk::CloseDirDeleter> dir(opendir(candidatePath.str().c_str()));
if (!dir)
{
LOG(ERR, "Could not open candidate directory.", candidatePath, sysErr());
return FhgfsOpsErr_INTERNAL;
}
int dirFD = ::dirfd(dir.get());
if (dirFD < 0)
{
LOG(ERR, "Could not open candidate directory.", candidatePath, sysErr());
return FhgfsOpsErr_INTERNAL;
}
while (true)
{
struct dirent* entry;
#if USE_READDIR_P
struct dirent entryBuf;
int err = ::readdir_r(dir.get(), &entryBuf, &entry);
#else
errno = 0;
entry = readdir(dir.get());
int err = entry ? 0 : errno;
#endif
if (err > 0)
{
LOG(ERR, "Could not read candidate directory.", candidatePath, sysErr());
numDirErrors.increase();
break;
}
if (!entry)
break;
if (strcmp(entry->d_name, ".") == 0 || strcmp(entry->d_name, "..") == 0)
continue;
struct stat statData;
if (::fstatat(dirFD, entry->d_name, &statData, AT_SYMLINK_NOFOLLOW) < 0)
{
// the file/directory may have gone away. this is not an error, and the secondary will
// delete the file/directory as well.
if (errno == ENOENT)
continue;
LOG(ERR, "Could not stat resync candidate.", candidatePath, entry->d_name, sysErr());
numFileErrors.increase();
continue;
}
if (!S_ISDIR(statData.st_mode) && !S_ISREG(statData.st_mode))
{
LOG(ERR, "Resync candidate is neither file nor directory.",
candidatePath, entry->d_name, statData.st_mode);
numFileErrors.increase();
continue;
}
if (candidate.getType() == MetaSyncDirType::ContentDir)
{
// if it's in a content directory and a directory, it can really only be the fsids dir.
// locking for this case is already sorted, so we only have to transfer the (empty)
// inode metadata to tell the secondary that the directory may stay.
if (S_ISDIR(statData.st_mode))
{
const FhgfsOpsErr streamRes = streamInode(socket, Path(entry->d_name), true);
if (streamRes != FhgfsOpsErr_SUCCESS)
return streamRes;
}
else
{
ParentNameLock dentryLock(lockStore, inodeID, entry->d_name);
const auto streamRes = streamDentry(socket, Path(), entry->d_name);
if (streamRes != FhgfsOpsErr_SUCCESS)
{
numFileErrors.increase();
return streamRes;
}
else
{
numFilesSynced.increase();
}
}
continue;
}
// we are now either in a fsids (file inode) directory or a second-level inode hash-dir,
// which may contain either file or directory inodes. taking a lock unnecessarily is stilll
// cheaper than reading the inode from disk to determine its type, so just lock the inode id
// as both file and directory
FileIDLock fileLock(lockStore, entry->d_name);
DirIDLock dirLock(lockStore, entry->d_name, true);
// access the file once more, because it may have been deleted in the meantime. a new entry
// with the same name cannot appear in a sane filesystem (that would indicate an ID being
// reused).
if (faccessat(dirFD, entry->d_name, F_OK, 0) < 0 && errno == ENOENT)
continue;
const FhgfsOpsErr streamRes = streamInode(socket, Path(entry->d_name),
S_ISDIR(statData.st_mode));
if (streamRes != FhgfsOpsErr_SUCCESS)
{
numFileErrors.increase();
return streamRes;
}
else
{
numFilesSynced.increase();
}
}
return sendResyncPacket(socket, std::tuple<>());
}
// fhgfs_meta\source\components\buddyresyncer\SyncSlaveBase.cpp
FhgfsOpsErr SyncSlaveBase::resyncAt(const Path& basePath, bool wholeDirectory,
FhgfsOpsErr (*streamFn)(Socket*, void*), void* context)
{
const bool sendXAttrs = Program::getApp()->getConfig()->getStoreClientXAttrs();
this->basePath = META_BUDDYMIRROR_SUBDIR_NAME / basePath;
ResyncRawInodesMsgEx msg(basePath, sendXAttrs, wholeDirectory);
RequestResponseNode rrNode(buddyNodeID, Program::getApp()->getMetaNodes());
RequestResponseArgs rrArgs(nullptr, &msg, NETMSGTYPE_ResyncRawInodesResp,
streamFn, context);
// resync processing may take a very long time for each step, eg if a very large directory must
// be cleaned out on the secondary. do not use timeouts for resync communication right now.
// TODO move long-running tasks on the secondary into own threads and have the secondary notify
// the primary on completion instead of having the primary poll the secondary
rrArgs.minTimeoutMS = -1;
const auto commRes = MessagingTk::requestResponseNode(&rrNode, &rrArgs);
if (commRes != FhgfsOpsErr_SUCCESS)
{
LOG(ERR, "Error during communication with secondary.", commRes);
return commRes;
}
const auto resyncRes = static_cast<ResyncRawInodesRespMsg*>(rrArgs.outRespMsg)->getResult();
if (resyncRes != FhgfsOpsErr_SUCCESS)
LOG(ERR, "Error while resyncing directory.", basePath, resyncRes);
return resyncRes;
}
// fhgfs_meta\source\components\buddyresyncer\BuddyResyncerModSyncSlave.h
class BuddyResyncerModSyncSlave : public SyncSlaveBase
{
friend class BuddyResyncer;
friend class BuddyResyncJob;
public:
BuddyResyncerModSyncSlave(BuddyResyncJob& parentJob, MetaSyncCandidateStore* syncCanditates,
uint8_t slaveID, const NumNodeID& buddyNodeID);
struct Stats
{
uint64_t objectsSynced;
uint64_t errors;
};
Stats getStats()
{
return Stats{ numObjectsSynced.read(), numErrors.read() };
}
private:
MetaSyncCandidateStore* syncCandidates;
AtomicUInt64 numObjectsSynced;
AtomicUInt64 numErrors;
void syncLoop();
FhgfsOpsErr streamCandidates(Socket& socket);
private:
static FhgfsOpsErr streamCandidates(Socket* socket, void* context)
{
return static_cast<BuddyResyncerModSyncSlave*>(context)->streamCandidates(*socket);
}
};
#endif
// fhgfs_meta\source\components\buddyresyncer\BuddyResyncJob.cpp
BuddyResyncJob::BuddyResyncJob() :
PThread("BuddyResyncJob"),
state(BuddyResyncJobState_NOTSTARTED),
startTime(0), endTime(0),
gatherSlave(boost::make_unique<BuddyResyncerGatherSlave>(&syncCandidates))
{
App* app = Program::getApp();
Config* cfg = app->getConfig();
buddyNodeID =
NumNodeID(app->getMetaBuddyGroupMapper()->getBuddyTargetID(app->getLocalNodeNumID().val()));
const unsigned numSyncSlaves = std::max<unsigned>(cfg->getTuneNumResyncSlaves(), 1);
for (size_t i = 0; i < numSyncSlaves; i++)
bulkSyncSlaves.emplace_back(
boost::make_unique<BuddyResyncerBulkSyncSlave>(*this, &syncCandidates, i, buddyNodeID));
sessionStoreResyncer = boost::make_unique<SessionStoreResyncer>(buddyNodeID);
modSyncSlave = boost::make_unique<BuddyResyncerModSyncSlave>(*this, &syncCandidates, 1, buddyNodeID);
}
// fhgfs_meta\source\net\message\MirroredMessage.h
template<typename BaseT, typename LockStateT>
class MirroredMessage : public BaseT
{
void finishOperation(NetMessage::ResponseContext& ctx,
std::unique_ptr<MirroredMessageResponseState> state)
{
auto* responsePtr = state.get();
bool buddyCommSuccessful = true;
if (isMirrored() &&
!this->hasFlag(NetMessageHeader::Flag_BuddyMirrorSecond) &&
state)
{
if (state->changesObservableState())
buddyCommSuccessful = forwardToSecondary(ctx);
else
buddyCommSuccessful = notifySecondaryOfACK(ctx);
}
if (mirrorState)
mirrorState->response = std::move(state);
// pairs with the memory barrier before acquireMirrorStateSlot
__sync_synchronize();
if (BuddyResyncer::getSyncChangeset())
{
if (isMirrored() &&
!this->hasFlag(NetMessageHeader::Flag_BuddyMirrorSecond) &&
responsePtr &&
responsePtr->changesObservableState())
BuddyResyncer::commitThreadChangeSet();
else
BuddyResyncer::abandonSyncChangeset();
}
if (responsePtr && buddyCommSuccessful)
responsePtr->sendResponse(ctx);
else if (!buddyCommSuccessful)
ctx.sendResponse(
GenericResponseMsg(
GenericRespMsgCode_INDIRECTCOMMERR_NOTAGAIN,
"Communication with secondary failed"));
lockState = {};
}
}
// fhgfs_meta\source\components\buddyresyncer\BuddyResyncer.cpp
void BuddyResyncer::commitThreadChangeSet()
{
BEEGFS_BUG_ON(!currentThreadChangeSet, "no change set active");
auto* job = Program::getApp()->getBuddyResyncer()->getResyncJob();
std::unique_ptr<MetaSyncCandidateFile> candidate(currentThreadChangeSet);
currentThreadChangeSet = nullptr;
Barrier syncDone(2);
candidate->prepareSignal(syncDone);
job->enqueue(std::move(*candidate), PThread::getCurrentThread());
syncDone.wait();
}
// beegfs-6.18\fhgfs_meta\source\components\buddyresyncer\BuddyResyncJob.h
class BuddyResyncJob : public PThread
{
public:
void enqueue(MetaSyncCandidateFile syncCandidate, PThread* caller)
{
syncCandidates.add(std::move(syncCandidate), caller);
}
};
// fhgfs_meta\source\components\buddyresyncer\BuddyResyncerModSyncSlave.cpp
void BuddyResyncerModSyncSlave::syncLoop()
{
while (!getSelfTerminateNotIdle())
{
if (syncCandidates->waitForFiles(this))
resyncAt(Path(), false, streamCandidates, this);
else if (getOnlyTerminateIfIdle())
break;
}
}
// fhgfs_meta\source\net\message\storage\mirroring\ResyncRawInodesMsgEx.cpp
bool ResyncRawInodesMsgEx::processIncoming(ResponseContext& ctx)
{
LOG_DBG(DEBUG, "Received a ResyncRawInodesMsg.", as("peer", ctx.getSocket()->getPeername()),
basePath, hasXAttrs, wholeDirectory);
const FhgfsOpsErr resyncRes = resyncStream(ctx);
ctx.sendResponse(ResyncRawInodesRespMsg(resyncRes));
return resyncRes == FhgfsOpsErr_SUCCESS;
}
FhgfsOpsErr ResyncRawInodesMsgEx::resyncStream(ResponseContext& ctx)
{
if (hasXAttrs && !Program::getApp()->getConfig()->getStoreClientXAttrs())
{
LOG(ERR, "Primary has indicated xattr resync, but xattrs are disabled in config.");
return FhgfsOpsErr_NOTSUPP;
}
auto* const metaBGM = Program::getApp()->getMetaBuddyGroupMapper();
auto* const metaNodes = Program::getApp()->getMetaNodes();
auto* const rootDir = Program::getApp()->getRootDir();
// if the local root is not buddyMirrored yet, set local buddy mirroring for the root inode.
if (metaNodes->getRootNodeNumID().val() == metaBGM->getLocalGroupID() &&
!rootDir->getIsBuddyMirrored())
{
const auto setMirrorRes = SetMetadataMirroringMsgEx::setMirroring();
if (setMirrorRes != FhgfsOpsErr_SUCCESS)
{
LOG(ERR, "Failed to set meta mirroring on the root directory", setMirrorRes);
return setMirrorRes;
}
}
// if our path is a directory, we must create it now, otherwise, the directory may not be
// created at all. for example the #fSiDs# directory in an empty content directory with no
// orphaned fsids would not be created.
if (wholeDirectory)
{
const auto mkRes = Program::getApp()->getMetaStore()->beginResyncFor(
META_BUDDYMIRROR_SUBDIR_NAME / basePath, true);
if (mkRes.first != FhgfsOpsErr_SUCCESS)
{
LOG(ERR, "Failed to create metadata directory.", basePath, as("mkRes", mkRes.first));
return mkRes.first;
}
}
while (true)
{
const auto resyncPartRes = resyncSingle(ctx);
if (resyncPartRes == FhgfsOpsErr_AGAIN)
continue;
else if (resyncPartRes == FhgfsOpsErr_SUCCESS)
break;
return resyncPartRes;
}
const FhgfsOpsErr result = wholeDirectory
? removeUntouchedInodes()
: FhgfsOpsErr_SUCCESS;
return result;
}
FhgfsOpsErr ResyncRawInodesMsgEx::resyncSingle(ResponseContext& ctx)
{
uint32_t packetLength;
ctx.getSocket()->recvExact(&packetLength, sizeof(packetLength), 0);
packetLength = LE_TO_HOST_32(packetLength);
if (packetLength == 0)
return FhgfsOpsErr_SUCCESS;
std::unique_ptr<char[]> packetData(new char[packetLength]);
ctx.getSocket()->recvExact(packetData.get(), packetLength, 0);
Deserializer des(packetData.get(), packetLength);
MetaSyncFileType packetType;
std::string relPath;
des
% packetType
% relPath;
if (!des.good())
{
LOG(ERR, "Received bad data from primary.");
return FhgfsOpsErr_INTERNAL;
}
if (wholeDirectory)
inodesWritten.push_back(relPath);
FhgfsOpsErr result;
switch (packetType)
{
case MetaSyncFileType::Inode:
case MetaSyncFileType::Directory:
result = resyncInode(ctx, basePath / relPath, des,
packetType == MetaSyncFileType::Directory);
break;
case MetaSyncFileType::Dentry:
result = resyncDentry(ctx, basePath / relPath, des);
break;
default:
result = FhgfsOpsErr_INVAL;
}
ctx.sendResponse(ResyncRawInodesRespMsg(result));
// if the resync has failed, we have to return the result twice - once as an ACK for the packet,
// and another time to terminate the stream. mod sync could do without the termination, but
// bulk resync can't.
if (result == FhgfsOpsErr_SUCCESS)
return FhgfsOpsErr_AGAIN;
else
return result;
}
FhgfsOpsErr ResyncRawInodesMsgEx::resyncInode(ResponseContext& ctx, const Path& path,
Deserializer& data, const bool isDirectory, const bool recvXAttrs)
{
std::vector<char> content;
bool isDeletion;
data
% content
% isDeletion;
if (!data.good())
{
LOG(ERR, "Received bad data from primary.");
return FhgfsOpsErr_INTERNAL;
}
if (isDeletion)
{
const bool rmRes = isDirectory
? StorageTk::removeDirRecursive((META_BUDDYMIRROR_SUBDIR_NAME / path).str())
: unlink((META_BUDDYMIRROR_SUBDIR_NAME / path).str().c_str()) == 0;
if (rmRes || errno == ENOENT)
return FhgfsOpsErr_SUCCESS;
LOG(ERR, "Failed to remove raw meta inode.", path, sysErr());
return FhgfsOpsErr_INTERNAL;
}
if (!isDirectory && wholeDirectory)
{
const auto unlinkRes = Program::getApp()->getMetaStore()->unlinkRawMetadata(
META_BUDDYMIRROR_SUBDIR_NAME / path);
if (unlinkRes != FhgfsOpsErr_SUCCESS && unlinkRes != FhgfsOpsErr_PATHNOTEXISTS)
{
LOG(ERR, "Could not unlink raw metadata", path, unlinkRes);
return FhgfsOpsErr_INTERNAL;
}
}
auto inode = Program::getApp()->getMetaStore()->beginResyncFor(
META_BUDDYMIRROR_SUBDIR_NAME / path, isDirectory);
if (inode.first)
return inode.first;
if (!isDirectory)
{
const auto setContentRes = inode.second.setContent(&content[0], content.size());
if (setContentRes)
return setContentRes;
}
if (!hasXAttrs || !recvXAttrs)
return FhgfsOpsErr_SUCCESS;
const auto xattrRes = resyncInodeXAttrs(ctx, inode.second);
if (xattrRes != FhgfsOpsErr_SUCCESS)
{
LOG(ERR, "Syncing XAttrs failed.", path, xattrRes);
return xattrRes;
}
return FhgfsOpsErr_SUCCESS;
}
FhgfsOpsErr ResyncRawInodesMsgEx::resyncDentry(ResponseContext& ctx, const Path& path,
Deserializer& data)
{
bool linksToFsID;
data % linksToFsID;
if (!data.good())
{
LOG(ERR, "Received bad data from primary.");
return FhgfsOpsErr_INTERNAL;
}
// dentries with independent contents (dir dentries, dentries to non-inlined files) can be
// treated like inodes for the purpose of resync. don't sync xattrs though, because dentries
// should never have them
if (!linksToFsID)
return resyncInode(ctx, path, data, false, false);
std::string targetID;
bool isDeletion;
data
% targetID
% isDeletion;
if (!data.good())
{
LOG(ERR, "Received bad data from primary.");
return FhgfsOpsErr_INTERNAL;
}
const FhgfsOpsErr rmRes = Program::getApp()->getMetaStore()->unlinkRawMetadata(
META_BUDDYMIRROR_SUBDIR_NAME / path);
if (rmRes != FhgfsOpsErr_SUCCESS && rmRes != FhgfsOpsErr_PATHNOTEXISTS)
{
LOG(ERR, "Could not unlink old dentry.", path, rmRes);
return FhgfsOpsErr_INTERNAL;
}
if (isDeletion)
return FhgfsOpsErr_SUCCESS;
const Path& idPath = path.dirname() / META_DIRENTRYID_SUB_STR / targetID;
const int linkRes = ::link(
(META_BUDDYMIRROR_SUBDIR_NAME / idPath).str().c_str(),
(META_BUDDYMIRROR_SUBDIR_NAME / path).str().c_str());
if (linkRes < 0)
{
LOG(ERR, "Could not link dentry to fsid.", path, idPath, sysErr());
return FhgfsOpsErr_INTERNAL;
}
return FhgfsOpsErr_SUCCESS;
}