前言
- 我们在Libvirt同步机制 —— 设计原理中详细分析了Libvirt项目涉及到的几种需要多线程同步的场景以及基于这些场景Libvirt设计的对应同步机制,本文以此为基础,继续分析这些同步机制的数据结构设计,接口设计以及流程实现
VM同步
数据结构
- 虚机的数据结构如下,
parent
字段指向了VM的基类,保存了用于同步VM数据结构的互斥锁。privateData
字段保存了不同驱动的私有数据,lxc和qemu各不相同,qemu驱动中该指针为qemuDomainObjPrivatePtr
类型,qemuDomainObjPrivate.job
保存了任务相关数据结构。
struct _virDomainObj {
virObjectLockable parent;
......
void *privateData;
......
}
typedef struct _virDomainObj virDomainObj;
接口
virDomainObjPtr qemuDomainObjFromDomain(virDomainPtr domain)
virDomainObjPtr virDomainObjListFindByUUID(virDomainObjListPtr doms, const unsigned char *uuid)
virDomainObjPtr virDomainObjListFindByName(virDomainObjListPtr doms, const char *name)
同步任务
数据结构
qemuDomainJob
qemuDomainJob
定义了对虚拟机操作的所有同步任务,其中QEMU_JOB_NONE
表示没有任务,QEMU_JOB_ASYNC
比较特殊,当异步任务发起时需要标记虚机的同步任务为QEMU_JOB_ASYNC
,Libvirt要求针对同一个VM,同一时间只能允许一下任务中的其中一个执行。
/* Only 1 job is allowed at any time
* A job includes *all* monitor commands, even those just querying
* information, not merely actions */
typedef enum {
QEMU_JOB_NONE = 0, /* Always set to 0 for easy if (jobActive) conditions */
QEMU_JOB_QUERY, /* Doesn't change any state */
QEMU_JOB_DESTROY, /* Destroys the domain (cannot be masked out) */
QEMU_JOB_SUSPEND, /* Suspends (stops vCPUs) the domain */
QEMU_JOB_MODIFY, /* May change state */
QEMU_JOB_ABORT, /* Abort current async job */
QEMU_JOB_MIGRATION_OP, /* Operation influencing outgoing migration */
/* The following two items must always be the last items before JOB_LAST */
QEMU_JOB_ASYNC, /* Asynchronous job */
QEMU_JOB_ASYNC_NESTED, /* Normal job within an async job */
QEMU_JOB_LAST
} qemuDomainJob;
qemuDomainJobObj
qemuDomainJobObj
是描述任务的数据结构,抽象了所有同步、异步、嵌套异步任务的信息。
struct _qemuDomainJobObj {
/* 同步任务信号量,当VM有同步任务在执行时
* 新发起的任务会等待在该信号量上,直到当前任务结束后被唤醒
* */
virCond cond; /* Use to coordinate jobs */
/* The following members are for QEMU_JOB_* */
/* 保存当前虚机正在执行的同步任务,如果没有,为QEMU_JOB_NONE */
qemuDomainJob active; /* Currently running job */
/* 执行当前任务的线程ID */
unsigned long long owner; /* Thread id which set current job */
const char *ownerAPI; /* The API which owns the job */
/* 开始执行该任务的时间戳 */
unsigned long long started; /* When the current job started */
......
}
接口
qemuDomainObjBeginJob
—— 标记同步任务开始
int qemuDomainObjBeginJob(virQEMUDriverPtr driver,
virDomainObjPtr obj,
qemuDomainJob job)
qemuDomainObjEndJob
—— 标记同步任务结束
void qemuDomainObjEndJob(virQEMUDriverPtr driver, virDomainObjPtr obj)
异步任务
数据结构
qemuDomainAsyncJob
- 异步任务允许在mask中的其它任务并发执行。并发任务可以是同步任务,也可以是异步任务。
/* Async job consists of a series of jobs that may change state. Independent
* jobs that do not change state (and possibly others if explicitly allowed by
* current async job) are allowed to be run even if async job is active.
*/
typedef enum {
QEMU_ASYNC_JOB_NONE = 0,
/* 迁移任务:迁出 */
QEMU_ASYNC_JOB_MIGRATION_OUT,
/* 迁移任务:迁入 */
QEMU_ASYNC_JOB_MIGRATION_IN,
QEMU_ASYNC_JOB_SAVE,
QEMU_ASYNC_JOB_DUMP,
/* 快照任务 */
QEMU_ASYNC_JOB_SNAPSHOT,
QEMU_ASYNC_JOB_START,
QEMU_ASYNC_JOB_BACKUP,
QEMU_ASYNC_JOB_LAST
} qemuDomainAsyncJob;
qemuDomainJobObj
qemuDomainJobObj
中除了定义同步任务,还包含了异步任务的信息,如下:
struct _qemuDomainJobObj {
......
/* The following members are for QEMU_ASYNC_JOB_* */
/* 异步任务信号量,当VM上有异步任务在执行时
* 新发起的任务如果不在当前任务允许并发执行的列表里,需要等待在该信号量上直到当前任务结束
* */
virCond asyncCond; /* Use to coordinate with async jobs */
/* 保存当前正在执行的异步任务 */
qemuDomainAsyncJob asyncJob; /* Currently active async job */
/* 异步任务的线程ID */
unsigned long long asyncOwner; /* Thread which set current async job */
const char *asyncOwnerAPI; /* The API which owns the async job */
/* 异步任务开始时间戳 */
unsigned long long asyncStarted; /* When the current async job started */
int phase; /* Job phase (mainly for migrations) */
/* 异步任务执行过程中,允许并发执行其它任务的任务白名单*/
unsigned long long mask; /* Jobs allowed during async job */
/* 异步任务信息 */
qemuDomainJobInfoPtr current; /* async job progress data */
qemuDomainJobInfoPtr completed; /* statistics data of a recently completed job */
......
}
qemuDomainJobStatus
- 异步任务是耗时比较长的任务,通常Libvirt会通过主动查询或者被动等待Qemu的事件,来检查任务是否完成,为了识别当前异步任务的状态,Libvirt定义了
qemuDomainJobStatus
,当Qemu执行这个异步任务有事件更新时,Libvirt可以用这个类型的变量来保存,方便外部的用户查询或者其它API逻辑做判断。
typedef enum {
QEMU_DOMAIN_JOB_STATUS_NONE = 0,
/* 任务正在执行中 */
QEMU_DOMAIN_JOB_STATUS_ACTIVE,
/* 迁移任务正在执行中 */
QEMU_DOMAIN_JOB_STATUS_MIGRATING,
/* QEMU侧已经完成了异步任务,但此时Libvirt可能没有结束 */
QEMU_DOMAIN_JOB_STATUS_QEMU_COMPLETED,
QEMU_DOMAIN_JOB_STATUS_PAUSED,
/* 迁移任务:处于postcopy阶段 */
QEMU_DOMAIN_JOB_STATUS_POSTCOPY,
/* Libvirt已经完成该任务 */
QEMU_DOMAIN_JOB_STATUS_COMPLETED,
QEMU_DOMAIN_JOB_STATUS_FAILED,
QEMU_DOMAIN_JOB_STATUS_CANCELED,
} qemuDomainJobStatus;
qemuDomainJobInfo
- Libvirt把异步任务相关的具体信息抽象成
qemuDomainJobInfo
,如下:
typedef struct _qemuDomainJobInfo qemuDomainJobInfo;
struct _qemuDomainJobInfo {
/* 任务状态 */
qemuDomainJobStatus status;
virDomainJobOperation operation;
/* 任务开始时间戳 */
unsigned long long started; /* When the async job started */
unsigned long long stopped; /* When the domain's CPUs were stopped */
unsigned long long sent; /* When the source sent status info to the
destination (only for migrations). */
unsigned long long received; /* When the destination host received status
info from the source (migrations only). */
/* Computed values */
unsigned long long timeElapsed;
long long timeDelta; /* delta = received - sent, i.e., the difference
between the source and the destination time plus
the time between the end of Perform phase on the
source and the beginning of Finish phase on the
destination. */
bool timeDeltaSet;
/* Raw values from QEMU */
/* 任务统计信息类型 */
qemuDomainJobStatsType statsType;
union {
qemuMonitorMigrationStats mig;
qemuMonitorDumpStats dump;
qemuDomainBackupStats backup;
} stats;
qemuDomainMirrorStats mirrorStats;
......
}
接口
qemuDomainObjBeginAsyncJob
—— 标记异步任务开始
int qemuDomainObjBeginAsyncJob(virQEMUDriverPtr driver,
virDomainObjPtr obj,
qemuDomainAsyncJob asyncJob,
virDomainJobOperation operation,
unsigned long apiFlags)
qemuDomainObjEndAsyncJob
—— 标记异步任务结束
void qemuDomainObjEndAsyncJob(virQEMUDriverPtr driver, virDomainObjPtr obj)
嵌套异步任务
- 嵌套异步任务复用了异步任务的数据结构,它的同步任务标记为
QEMU_JOB_ASYNC_NESTED
,嵌套任务与被嵌套任务的异步任务类型相同。接口如下: qemuDomainObjBeginNestedJob
—— 标记异步嵌套任务开始
int qemuDomainObjBeginNestedJob(virQEMUDriverPtr driver,
virDomainObjPtr obj,
qemuDomainAsyncJob asyncJob)
qemuDomainObjEndJob
—— 标记异步嵌套任务结束
void qemuDomainObjEndJob(virQEMUDriverPtr driver, virDomainObjPtr obj)
流程分析
异步任务开始 —— 无阻塞情况
- 我们以虚机的快照API举例,分析标记同步任务开始的核心流程,快照的流程大体如下:
qemuDomainSnapshotCreateXML /* 开始快照API */
qemuDomainObjFromDomain /* 获取要快照的VM,对VM加锁 */
qemuSnapshotCreateXML
/* 标记快照任务开始:
* 同步任务qemuDomainJob:QEMU_JOB_ASYNC
* 异步任务qemuDomainAsyncJob:VIR_DOMAIN_JOB_OPERATION_SNAPSHOT
* 异步任务操作virDomainJobOperation:VIR_DOMAIN_JOB_OPERATION_SNAPSHOT
* */
qemuDomainObjBeginAsyncJob(driver, vm, QEMU_ASYNC_JOB_SNAPSHOT,
VIR_DOMAIN_JOB_OPERATION_SNAPSHOT, flags)
/* 设置快照任务不允许其它任务并发 */
qemuDomainObjSetAsyncJobMask(vm, QEMU_JOB_NONE)
/* 快照核心逻辑 */
......
/* 结束异步任务 */
qemuDomainObjEndAsyncJob(driver, vm)
/* 结束快照API,释放VM锁 */
virDomainObjEndAPI(&vm)
qemuDomainObjBeginAsyncJob
/* 标记任务开始的核心函数 */
qemuDomainObjBeginJobInternal(driver, obj, QEMU_JOB_ASYNC,
QEMU_AGENT_JOB_NONE,
asyncJob, false)
priv = obj->privateData;
priv->job.current->operation = operation;
priv->job.apiFlags = apiFlags;
- 我们仔细分析
qemuDomainObjBeginJobInternal
函数,将异步任务的部分摘出来分析:
static int ATTRIBUTE_NONNULL(1)
qemuDomainObjBeginJobInternal(virQEMUDriverPtr driver,
virDomainObjPtr obj,
qemuDomainJob job,
qemuDomainAgentJob agentJob,
qemuDomainAsyncJob asyncJob,
bool nowait)
{
qemuDomainObjPrivatePtr priv = obj->privateData;
unsigned long long now;
unsigned long long then;
/* 并非嵌套任务,这里nested为false */
bool nested = job == QEMU_JOB_ASYNC_NESTED;
/* async为true */
bool async = job == QEMU_JOB_ASYNC;
g_autoptr(virQEMUDriverConfig) cfg = virQEMUDriverGetConfig(driver);
if (virTimeMillisNow(&now) < 0)
return -1;
/* 统计入队的任务数 */
priv->jobs_queued++;
/* 设置等待任务的超时时间 */
then = now + QEMU_JOB_WAIT_TIME;
retry:
if ((!async && job != QEMU_JOB_DESTROY) &&
cfg->maxQueuedJobs &&
priv->jobs_queued > cfg->maxQueuedJobs) {
goto error;
}
/* 如果不是嵌套任务,需要通过qemuDomainNestedJobAllowed判断是否直接标记任务开始
* 如果检查返回fals,需要在异步任务信号量上等待,否则跳过等待,检查的逻辑如下:
* return !jobs->asyncJob || newJob == QEMU_JOB_NONE || (jobs->mask & JOB_MASK(newJob)) != 0;
* 如果当前虚机有异步任务,返回结果取决于后面两个判断,这里没有异步任务,因此返回true。不需要等待。
* */
while (!nested && !qemuDomainNestedJobAllowed(&priv->job, job)) {
if (virCondWaitUntil(&priv->job.asyncCond, &obj->parent.lock, then) < 0)
goto error;
}
/* 通过qemuDomainObjCanSetJob判断当前虚机是否有同步任务,如果函数返回false
* 同样要在同步任务信号量上等待,否则直接标记任务开始,检查逻辑如下:
* return ((newJob == QEMU_JOB_NONE || job->active == QEMU_JOB_NONE) &&
* 如果当前虚机上有同步任务,必然返回false,需要在同步任务信号量上等待其完成
* 如果虚机上没有同步任务,还要检查是否有agent的任务,如果有agent的任务,仍然需要等待agent任务完成
* 只有虚机既无同步任务,又无agent任务,才可以直接跳过等待,直接标记任务开始
* (newAgentJob == QEMU_AGENT_JOB_NONE || job->agentActive == QEMU_AGENT_JOB_NONE))
*/
while (!qemuDomainObjCanSetJob(&priv->job, job, agentJob)) {
if (virCondWaitUntil(&priv->job.cond, &obj->parent.lock, then) < 0)
goto error;
}
......
if (job) {
/* 复位虚机的任务 */
qemuDomainObjResetJob(&priv->job);
/* 如果要标记的是同步任务,设置priv->job.active */
if (job != QEMU_JOB_ASYNC) {
priv->job.active = job;
priv->job.owner = virThreadSelfID();
priv->job.ownerAPI = virThreadJobGet();
priv->job.started = now;
} else {
/* 如果要标记的是异步任务,设置priv->job.asyncJob */
qemuDomainObjResetAsyncJob(&priv->job);
priv->job.current = g_new0(qemuDomainJobInfo, 1);
priv->job.current->status = QEMU_DOMAIN_JOB_STATUS_ACTIVE;
priv->job.asyncJob = asyncJob;
priv->job.asyncOwner = virThreadSelfID();
priv->job.asyncOwnerAPI = virThreadJobGet();
priv->job.asyncStarted = now;
priv->job.current->started = now;
}
}
......
异步任务开始 —— 阻塞情况
- 假设在执行快照任务的过程中,Libvirtd收到rpc消息,执行迁移任务,我们分析标记任务开始的流程,迁移的流程大体如下:
qemuDomainMigratePerform
qemuDomainObjFromDomain
qemuMigrationSrcPerform
qemuMigrationSrcPerformJob
qemuMigrationJobStart(driver, vm, QEMU_ASYNC_JOB_MIGRATION_OUT, flags)
qemuDomainObjBeginAsyncJob
qemuDomainObjBeginJobInternal
......
qemuMigrationJobFinish
virDomainObjEndAPI
- 对于当前虚机有快照这个异步任务的情况,我们再次分析
qemuDomainObjBeginJobInternal
函数:
static int ATTRIBUTE_NONNULL(1)
qemuDomainObjBeginJobInternal(virQEMUDriverPtr driver,
virDomainObjPtr obj,
qemuDomainJob job,
qemuDomainAgentJob agentJob,
qemuDomainAsyncJob asyncJob,
bool nowait)
{
qemuDomainObjPrivatePtr priv = obj->privateData;
unsigned long long now;
unsigned long long then;
/* 并非嵌套任务,这里nested为false */
bool nested = job == QEMU_JOB_ASYNC_NESTED;
/* async为true */
bool async = job == QEMU_JOB_ASYNC;
g_autoptr(virQEMUDriverConfig) cfg = virQEMUDriverGetConfig(driver);
if (virTimeMillisNow(&now) < 0)
return -1;
/* 统计入队的任务数 */
priv->jobs_queued++;
/* 设置等待任务的超时时间 */
then = now + QEMU_JOB_WAIT_TIME;
retry:
if ((!async && job != QEMU_JOB_DESTROY) &&
cfg->maxQueuedJobs &&
priv->jobs_queued > cfg->maxQueuedJobs) {
goto error;
}
/* 如果不是嵌套任务,需要通过qemuDomainNestedJobAllowed判断是否直接标记任务开始
* 如果检查返回fals,需要在异步任务信号量上等待,否则跳过等待,检查的逻辑如下:
* return !jobs->asyncJob || newJob == QEMU_JOB_NONE || (jobs->mask & JOB_MASK(newJob)) != 0;
* 如果当前虚机有异步任务,返回结果取决于后面两个判断,这里有快照任务
* 则需要检查快照任务执行的同时是否允许迁移任务并发执行,这里是不允许的,因为
* 因为在快照API中,标记任务开始后,通过
* qemuDomainObjSetAsyncJobMask(vm, QEMU_JOB_NONE) 立即设置了允许并发执行的异步任务。
* */
while (!nested && !qemuDomainNestedJobAllowed(&priv->job, job)) {
if (virCondWaitUntil(&priv->job.asyncCond, &obj->parent.lock, then) < 0)
goto error;
}
/* 通过qemuDomainObjCanSetJob判断当前虚机是否有同步任务,如果函数返回false
* 同样要在同步任务信号量上等待,否则直接标记任务开始,检查逻辑如下:
* return ((newJob == QEMU_JOB_NONE || job->active == QEMU_JOB_NONE) &&
* 如果当前虚机上有同步任务,必然返回false,需要在同步任务信号量上等待其完成
* 如果虚机上没有同步任务,还要检查是否有agent的任务,如果有agent的任务,仍然需要等待agent任务完成
* 只有虚机既无同步任务,又无agent任务,才可以直接跳过等待,直接标记任务开始
* (newAgentJob == QEMU_AGENT_JOB_NONE || job->agentActive == QEMU_AGENT_JOB_NONE))
*/
while (!qemuDomainObjCanSetJob(&priv->job, job, agentJob)) {
if (virCondWaitUntil(&priv->job.cond, &obj->parent.lock, then) < 0)
goto error;
}
......
if (job) {
/* 复位虚机的任务 */
qemuDomainObjResetJob(&priv->job);
/* 如果要标记的是同步任务,设置priv->job.active */
if (job != QEMU_JOB_ASYNC) {
priv->job.active = job;
priv->job.owner = virThreadSelfID();
priv->job.ownerAPI = virThreadJobGet();
priv->job.started = now;
} else {
/* 如果要标记的是异步任务,设置priv->job.asyncJob */
qemuDomainObjResetAsyncJob(&priv->job);
......
priv->job.asyncJob = asyncJob;
......
}
}
......
嵌套任务开始
- 标记迁移任务后,API在调用qmp命令发起迁移前,需要标记异步嵌套任务开始,这个在
qemuDomainObjEnterMonitorAsync
完成。流程如下:
qemuMigrationSrcRun
qemuDomainObjEnterMonitorAsync(driver, vm, QEMU_ASYNC_JOB_MIGRATION_OUT)
qemuDomainObjEnterMonitorInternal
qemuDomainObjBeginNestedJob
qemuMonitorMigrateToFd
......
qemuDomainObjExitMonitor
- 分析
qemuDomainObjEnterMonitorAsync
的实现:
qemuDomainObjEnterMonitorAsync(driver, vm, QEMU_ASYNC_JOB_MIGRATION_OUT)
qemuDomainObjEnterMonitorInternal(driver, obj, asyncJob)
/* 如果存在异步任务,进入嵌套异步任务的标记流程 */
if (asyncJob != QEMU_ASYNC_JOB_NONE)
qemuDomainObjBeginNestedJob(driver, obj, asyncJob)
- 仔细分析
qemuDomainObjBeginNestedJob
的逻辑:
int
qemuDomainObjBeginNestedJob(virQEMUDriverPtr driver,
virDomainObjPtr obj,
qemuDomainAsyncJob asyncJob)
{
qemuDomainObjPrivatePtr priv = obj->privateData;
/* 如果要发起的嵌套任务和虚机正在执行的任务不同,报错返回 */
if (asyncJob != priv->job.asyncJob) {
virReportError(VIR_ERR_INTERNAL_ERROR,
_("unexpected async job %d type expected %d"),
asyncJob, priv->job.asyncJob);
return -1;
}
/* 通常都是同一个线程在第一次标记异步任务开始后,再次标记嵌套异步任务开始
* 因此线程ID是一样的,如果不一样,打印告警信息 */
if (priv->job.asyncOwner != virThreadSelfID()) {
VIR_WARN("This thread doesn't seem to be the async job owner: %llu",
priv->job.asyncOwner);
}
/* 如果检查没有问题,进入公共的任务标记流程,这里我们看到传入的同步任务是
* QEMU_JOB_ASYNC_NESTED,这个能理解,但异步任务是
* QEMU_ASYNC_JOB_NONE,为什么?因为异步任务标记了
* 嵌套任务和异步任务的类型一样,这里传入QEMU_ASYNC_JOB_NONE不影响异步任务的标记 */
return qemuDomainObjBeginJobInternal(driver, obj,
QEMU_JOB_ASYNC_NESTED,
QEMU_AGENT_JOB_NONE,
QEMU_ASYNC_JOB_NONE,
false);
}
任务结束
- 结束同步任务的核心函数是
qemuDomainObjEndJob
,如下:
void qemuDomainObjEndJob(virQEMUDriverPtr driver, virDomainObjPtr obj)
{
qemuDomainObjPrivatePtr priv = obj->privateData;
qemuDomainJob job = priv->job.active;
priv->jobs_queued--;
/* 复位同步任务 */
qemuDomainObjResetJob(&priv->job);
if (qemuDomainTrackJob(job))
qemuDomainObjSaveStatus(driver, obj);
/* We indeed need to wake up ALL threads waiting because
* grabbing a job requires checking more variables. */
/* 唤醒所有等待在同步任务信号量上的线程 */
virCondBroadcast(&priv->job.cond);
}