当前位置: 首页 > 工具软件 > Libvirt > 使用案例 >

Libvirt同步机制 —— 实现原理

柯乐童
2023-12-01

前言

  • 我们在Libvirt同步机制 —— 设计原理中详细分析了Libvirt项目涉及到的几种需要多线程同步的场景以及基于这些场景Libvirt设计的对应同步机制,本文以此为基础,继续分析这些同步机制的数据结构设计,接口设计以及流程实现

VM同步

数据结构

  • 虚机的数据结构如下,parent字段指向了VM的基类,保存了用于同步VM数据结构的互斥锁。privateData字段保存了不同驱动的私有数据,lxc和qemu各不相同,qemu驱动中该指针为qemuDomainObjPrivatePtr类型,qemuDomainObjPrivate.job保存了任务相关数据结构。
struct _virDomainObj {
    virObjectLockable parent;
	......
    void *privateData;
	......
}
typedef struct _virDomainObj virDomainObj;

接口

  • 获取VM结构体virDomainObjPtr
virDomainObjPtr qemuDomainObjFromDomain(virDomainPtr domain)
virDomainObjPtr virDomainObjListFindByUUID(virDomainObjListPtr doms,  const unsigned char *uuid)
virDomainObjPtr virDomainObjListFindByName(virDomainObjListPtr doms, const char *name)                                                 

同步任务

数据结构

qemuDomainJob

  • qemuDomainJob定义了对虚拟机操作的所有同步任务,其中QEMU_JOB_NONE 表示没有任务,QEMU_JOB_ASYNC比较特殊,当异步任务发起时需要标记虚机的同步任务为QEMU_JOB_ASYNC,Libvirt要求针对同一个VM,同一时间只能允许一下任务中的其中一个执行。
/* Only 1 job is allowed at any time
 * A job includes *all* monitor commands, even those just querying
 * information, not merely actions */
typedef enum {
    QEMU_JOB_NONE = 0,  /* Always set to 0 for easy if (jobActive) conditions */
    QEMU_JOB_QUERY,         /* Doesn't change any state */
    QEMU_JOB_DESTROY,       /* Destroys the domain (cannot be masked out) */
    QEMU_JOB_SUSPEND,       /* Suspends (stops vCPUs) the domain */
    QEMU_JOB_MODIFY,        /* May change state */
    QEMU_JOB_ABORT,         /* Abort current async job */
    QEMU_JOB_MIGRATION_OP,  /* Operation influencing outgoing migration */

    /* The following two items must always be the last items before JOB_LAST */
    QEMU_JOB_ASYNC,         /* Asynchronous job */
    QEMU_JOB_ASYNC_NESTED,  /* Normal job within an async job */

    QEMU_JOB_LAST
} qemuDomainJob;

qemuDomainJobObj

  • qemuDomainJobObj是描述任务的数据结构,抽象了所有同步、异步、嵌套异步任务的信息。
struct _qemuDomainJobObj {
	/* 同步任务信号量,当VM有同步任务在执行时
	 * 新发起的任务会等待在该信号量上,直到当前任务结束后被唤醒 
	 * */
    virCond cond;                       /* Use to coordinate jobs */

    /* The following members are for QEMU_JOB_* */
    /* 保存当前虚机正在执行的同步任务,如果没有,为QEMU_JOB_NONE  */
    qemuDomainJob active;               /* Currently running job */
    /* 执行当前任务的线程ID */
    unsigned long long owner;           /* Thread id which set current job */
    const char *ownerAPI;               /* The API which owns the job */
    /* 开始执行该任务的时间戳 */
    unsigned long long started;         /* When the current job started */
	......
}

接口

  • qemuDomainObjBeginJob —— 标记同步任务开始
int qemuDomainObjBeginJob(virQEMUDriverPtr driver,
                          virDomainObjPtr obj,
                          qemuDomainJob job)
  • qemuDomainObjEndJob —— 标记同步任务结束
void qemuDomainObjEndJob(virQEMUDriverPtr driver, virDomainObjPtr obj)

异步任务

数据结构

qemuDomainAsyncJob

  • 异步任务允许在mask中的其它任务并发执行。并发任务可以是同步任务,也可以是异步任务。
/* Async job consists of a series of jobs that may change state. Independent
 * jobs that do not change state (and possibly others if explicitly allowed by
 * current async job) are allowed to be run even if async job is active.
 */
typedef enum {
    QEMU_ASYNC_JOB_NONE = 0,
    /* 迁移任务:迁出 */
    QEMU_ASYNC_JOB_MIGRATION_OUT,
    /* 迁移任务:迁入 */
    QEMU_ASYNC_JOB_MIGRATION_IN,
    QEMU_ASYNC_JOB_SAVE,
    QEMU_ASYNC_JOB_DUMP,
    /* 快照任务 */
    QEMU_ASYNC_JOB_SNAPSHOT,
    QEMU_ASYNC_JOB_START,
    QEMU_ASYNC_JOB_BACKUP,

    QEMU_ASYNC_JOB_LAST
} qemuDomainAsyncJob;

qemuDomainJobObj

  • qemuDomainJobObj中除了定义同步任务,还包含了异步任务的信息,如下:
struct _qemuDomainJobObj {
	......
    /* The following members are for QEMU_ASYNC_JOB_* */
    /* 异步任务信号量,当VM上有异步任务在执行时 
     * 新发起的任务如果不在当前任务允许并发执行的列表里,需要等待在该信号量上直到当前任务结束
     * */
    virCond asyncCond;                  /* Use to coordinate with async jobs */
    /* 保存当前正在执行的异步任务 */
    qemuDomainAsyncJob asyncJob;        /* Currently active async job */
    /* 异步任务的线程ID */
    unsigned long long asyncOwner;      /* Thread which set current async job */
    
    const char *asyncOwnerAPI;          /* The API which owns the async job */
    /* 异步任务开始时间戳 */
    unsigned long long asyncStarted;    /* When the current async job started */
    int phase;                          /* Job phase (mainly for migrations) */
    /* 异步任务执行过程中,允许并发执行其它任务的任务白名单*/
    unsigned long long mask;            /* Jobs allowed during async job */
    /* 异步任务信息 */
    qemuDomainJobInfoPtr current;       /* async job progress data */
    qemuDomainJobInfoPtr completed;     /* statistics data of a recently completed job */
	......
}

qemuDomainJobStatus

  • 异步任务是耗时比较长的任务,通常Libvirt会通过主动查询或者被动等待Qemu的事件,来检查任务是否完成,为了识别当前异步任务的状态,Libvirt定义了qemuDomainJobStatus,当Qemu执行这个异步任务有事件更新时,Libvirt可以用这个类型的变量来保存,方便外部的用户查询或者其它API逻辑做判断。
typedef enum {
    QEMU_DOMAIN_JOB_STATUS_NONE = 0,
    /* 任务正在执行中 */
    QEMU_DOMAIN_JOB_STATUS_ACTIVE,
    /* 迁移任务正在执行中 */
    QEMU_DOMAIN_JOB_STATUS_MIGRATING,
    /* QEMU侧已经完成了异步任务,但此时Libvirt可能没有结束 */
    QEMU_DOMAIN_JOB_STATUS_QEMU_COMPLETED,
    QEMU_DOMAIN_JOB_STATUS_PAUSED,
    /* 迁移任务:处于postcopy阶段 */
    QEMU_DOMAIN_JOB_STATUS_POSTCOPY,
    /* Libvirt已经完成该任务 */
    QEMU_DOMAIN_JOB_STATUS_COMPLETED,
    QEMU_DOMAIN_JOB_STATUS_FAILED,
    QEMU_DOMAIN_JOB_STATUS_CANCELED,
} qemuDomainJobStatus;

qemuDomainJobInfo

  • Libvirt把异步任务相关的具体信息抽象成qemuDomainJobInfo,如下:
typedef struct _qemuDomainJobInfo qemuDomainJobInfo;
struct _qemuDomainJobInfo {
	/* 任务状态 */
    qemuDomainJobStatus status;
    virDomainJobOperation operation;
    /* 任务开始时间戳 */
    unsigned long long started; /* When the async job started */
    unsigned long long stopped; /* When the domain's CPUs were stopped */
    unsigned long long sent; /* When the source sent status info to the
                                destination (only for migrations). */
    unsigned long long received; /* When the destination host received status
                                    info from the source (migrations only). */
    /* Computed values */
    unsigned long long timeElapsed;
    long long timeDelta; /* delta = received - sent, i.e., the difference
                            between the source and the destination time plus
                            the time between the end of Perform phase on the
                            source and the beginning of Finish phase on the
                            destination. */
    bool timeDeltaSet;
    /* Raw values from QEMU */
    /* 任务统计信息类型 */
    qemuDomainJobStatsType statsType;
    union {
        qemuMonitorMigrationStats mig;
        qemuMonitorDumpStats dump;
        qemuDomainBackupStats backup;
    } stats;
    qemuDomainMirrorStats mirrorStats;
	......
}

接口

  • qemuDomainObjBeginAsyncJob —— 标记异步任务开始
int qemuDomainObjBeginAsyncJob(virQEMUDriverPtr driver,
                               virDomainObjPtr obj,
                               qemuDomainAsyncJob asyncJob,
                               virDomainJobOperation operation,
                               unsigned long apiFlags)
  • qemuDomainObjEndAsyncJob —— 标记异步任务结束
void qemuDomainObjEndAsyncJob(virQEMUDriverPtr driver, virDomainObjPtr obj)

嵌套异步任务

  • 嵌套异步任务复用了异步任务的数据结构,它的同步任务标记为QEMU_JOB_ASYNC_NESTED,嵌套任务与被嵌套任务的异步任务类型相同。接口如下:
  • qemuDomainObjBeginNestedJob —— 标记异步嵌套任务开始
int qemuDomainObjBeginNestedJob(virQEMUDriverPtr driver,
                            	virDomainObjPtr obj,
                            	qemuDomainAsyncJob asyncJob)
  • qemuDomainObjEndJob —— 标记异步嵌套任务结束
void qemuDomainObjEndJob(virQEMUDriverPtr driver, virDomainObjPtr obj)

流程分析

异步任务开始 —— 无阻塞情况

  • 我们以虚机的快照API举例,分析标记同步任务开始的核心流程,快照的流程大体如下:
qemuDomainSnapshotCreateXML	/* 开始快照API */
	qemuDomainObjFromDomain	/* 获取要快照的VM,对VM加锁 */
	qemuSnapshotCreateXML
		/* 标记快照任务开始:
		 * 同步任务qemuDomainJob:QEMU_JOB_ASYNC 
		 * 异步任务qemuDomainAsyncJob:VIR_DOMAIN_JOB_OPERATION_SNAPSHOT
		 * 异步任务操作virDomainJobOperation:VIR_DOMAIN_JOB_OPERATION_SNAPSHOT
		 * */
	    qemuDomainObjBeginAsyncJob(driver, vm, QEMU_ASYNC_JOB_SNAPSHOT,
                                   VIR_DOMAIN_JOB_OPERATION_SNAPSHOT, flags)
         /* 设置快照任务不允许其它任务并发 */
        qemuDomainObjSetAsyncJobMask(vm, QEMU_JOB_NONE)
        /* 快照核心逻辑 */
        ......
        /* 结束异步任务 */
        qemuDomainObjEndAsyncJob(driver, vm)
    /* 结束快照API,释放VM锁 */
    virDomainObjEndAPI(&vm)
  • 标记异步任务开始流程:
qemuDomainObjBeginAsyncJob
	/* 标记任务开始的核心函数 */
	qemuDomainObjBeginJobInternal(driver, obj, QEMU_JOB_ASYNC,
                                  QEMU_AGENT_JOB_NONE,
                                  asyncJob, false)
	priv = obj->privateData;
    priv->job.current->operation = operation;
    priv->job.apiFlags = apiFlags;
  • 我们仔细分析qemuDomainObjBeginJobInternal函数,将异步任务的部分摘出来分析:
static int ATTRIBUTE_NONNULL(1)
qemuDomainObjBeginJobInternal(virQEMUDriverPtr driver,
                              virDomainObjPtr obj,
                              qemuDomainJob job,
                              qemuDomainAgentJob agentJob,
                              qemuDomainAsyncJob asyncJob,
                              bool nowait)
{
    qemuDomainObjPrivatePtr priv = obj->privateData;
    unsigned long long now;
    unsigned long long then;
    /* 并非嵌套任务,这里nested为false */
    bool nested = job == QEMU_JOB_ASYNC_NESTED;
    /* async为true */
    bool async = job == QEMU_JOB_ASYNC;
    g_autoptr(virQEMUDriverConfig) cfg = virQEMUDriverGetConfig(driver);

    if (virTimeMillisNow(&now) < 0)
        return -1;
	/* 统计入队的任务数 */
    priv->jobs_queued++;
    /* 设置等待任务的超时时间 */
    then = now + QEMU_JOB_WAIT_TIME;

 retry:
    if ((!async && job != QEMU_JOB_DESTROY) &&
        cfg->maxQueuedJobs &&
        priv->jobs_queued > cfg->maxQueuedJobs) {
        goto error;
    }
	/* 如果不是嵌套任务,需要通过qemuDomainNestedJobAllowed判断是否直接标记任务开始
	 * 如果检查返回fals,需要在异步任务信号量上等待,否则跳过等待,检查的逻辑如下:
	 * return !jobs->asyncJob || newJob == QEMU_JOB_NONE || (jobs->mask & JOB_MASK(newJob)) != 0;    
     * 如果当前虚机有异步任务,返回结果取决于后面两个判断,这里没有异步任务,因此返回true。不需要等待。
	 * */
    while (!nested && !qemuDomainNestedJobAllowed(&priv->job, job)) {
        if (virCondWaitUntil(&priv->job.asyncCond, &obj->parent.lock, then) < 0)
            goto error;
    }
    /* 通过qemuDomainObjCanSetJob判断当前虚机是否有同步任务,如果函数返回false
     * 同样要在同步任务信号量上等待,否则直接标记任务开始,检查逻辑如下:
     * return ((newJob == QEMU_JOB_NONE || job->active == QEMU_JOB_NONE)  &&
     * 如果当前虚机上有同步任务,必然返回false,需要在同步任务信号量上等待其完成
     * 如果虚机上没有同步任务,还要检查是否有agent的任务,如果有agent的任务,仍然需要等待agent任务完成
     * 只有虚机既无同步任务,又无agent任务,才可以直接跳过等待,直接标记任务开始
     *         (newAgentJob == QEMU_AGENT_JOB_NONE ||  job->agentActive == QEMU_AGENT_JOB_NONE))
     */
    while (!qemuDomainObjCanSetJob(&priv->job, job, agentJob)) {
        if (virCondWaitUntil(&priv->job.cond, &obj->parent.lock, then) < 0)
            goto error;
    }
	......	
    if (job) {
    	/* 复位虚机的任务 */
        qemuDomainObjResetJob(&priv->job);
	    /* 如果要标记的是同步任务,设置priv->job.active */
        if (job != QEMU_JOB_ASYNC) {
            priv->job.active = job;
            priv->job.owner = virThreadSelfID();
            priv->job.ownerAPI = virThreadJobGet();
            priv->job.started = now;
        } else {
        	   /* 如果要标记的是异步任务,设置priv->job.asyncJob */
            qemuDomainObjResetAsyncJob(&priv->job);
            priv->job.current = g_new0(qemuDomainJobInfo, 1);
            priv->job.current->status = QEMU_DOMAIN_JOB_STATUS_ACTIVE;
            priv->job.asyncJob = asyncJob;
            priv->job.asyncOwner = virThreadSelfID();
            priv->job.asyncOwnerAPI = virThreadJobGet();
            priv->job.asyncStarted = now;
            priv->job.current->started = now;
        }
    }
	......

异步任务开始 —— 阻塞情况

  • 假设在执行快照任务的过程中,Libvirtd收到rpc消息,执行迁移任务,我们分析标记任务开始的流程,迁移的流程大体如下:
qemuDomainMigratePerform
	qemuDomainObjFromDomain
	qemuMigrationSrcPerform
		qemuMigrationSrcPerformJob
			qemuMigrationJobStart(driver, vm, QEMU_ASYNC_JOB_MIGRATION_OUT,  flags) 
				qemuDomainObjBeginAsyncJob
					qemuDomainObjBeginJobInternal
			......
            qemuMigrationJobFinish          
	virDomainObjEndAPI
  • 对于当前虚机有快照这个异步任务的情况,我们再次分析qemuDomainObjBeginJobInternal函数:
static int ATTRIBUTE_NONNULL(1)
qemuDomainObjBeginJobInternal(virQEMUDriverPtr driver,
                              virDomainObjPtr obj,
                              qemuDomainJob job,
                              qemuDomainAgentJob agentJob,
                              qemuDomainAsyncJob asyncJob,
                              bool nowait)
{
    qemuDomainObjPrivatePtr priv = obj->privateData;
    unsigned long long now;
    unsigned long long then;
    /* 并非嵌套任务,这里nested为false */
    bool nested = job == QEMU_JOB_ASYNC_NESTED;
    /* async为true */
    bool async = job == QEMU_JOB_ASYNC;
    g_autoptr(virQEMUDriverConfig) cfg = virQEMUDriverGetConfig(driver);

    if (virTimeMillisNow(&now) < 0)
        return -1;
	/* 统计入队的任务数 */
    priv->jobs_queued++;
    /* 设置等待任务的超时时间 */
    then = now + QEMU_JOB_WAIT_TIME;

 retry:
    if ((!async && job != QEMU_JOB_DESTROY) &&
        cfg->maxQueuedJobs &&
        priv->jobs_queued > cfg->maxQueuedJobs) {
        goto error;
    }
	/* 如果不是嵌套任务,需要通过qemuDomainNestedJobAllowed判断是否直接标记任务开始
	 * 如果检查返回fals,需要在异步任务信号量上等待,否则跳过等待,检查的逻辑如下:
	 * return !jobs->asyncJob || newJob == QEMU_JOB_NONE || (jobs->mask & JOB_MASK(newJob)) != 0;    
     * 如果当前虚机有异步任务,返回结果取决于后面两个判断,这里有快照任务
     * 则需要检查快照任务执行的同时是否允许迁移任务并发执行,这里是不允许的,因为
     * 因为在快照API中,标记任务开始后,通过
     * qemuDomainObjSetAsyncJobMask(vm, QEMU_JOB_NONE) 立即设置了允许并发执行的异步任务。
	 * */
    while (!nested && !qemuDomainNestedJobAllowed(&priv->job, job)) {
        if (virCondWaitUntil(&priv->job.asyncCond, &obj->parent.lock, then) < 0)
            goto error;
    }
    /* 通过qemuDomainObjCanSetJob判断当前虚机是否有同步任务,如果函数返回false
     * 同样要在同步任务信号量上等待,否则直接标记任务开始,检查逻辑如下:
     * return ((newJob == QEMU_JOB_NONE || job->active == QEMU_JOB_NONE)  &&
     * 如果当前虚机上有同步任务,必然返回false,需要在同步任务信号量上等待其完成
     * 如果虚机上没有同步任务,还要检查是否有agent的任务,如果有agent的任务,仍然需要等待agent任务完成
     * 只有虚机既无同步任务,又无agent任务,才可以直接跳过等待,直接标记任务开始
     *         (newAgentJob == QEMU_AGENT_JOB_NONE ||  job->agentActive == QEMU_AGENT_JOB_NONE))
     */
    while (!qemuDomainObjCanSetJob(&priv->job, job, agentJob)) {
        if (virCondWaitUntil(&priv->job.cond, &obj->parent.lock, then) < 0)
            goto error;
    }
	......	
    if (job) {
    	/* 复位虚机的任务 */
        qemuDomainObjResetJob(&priv->job);
	    /* 如果要标记的是同步任务,设置priv->job.active */
        if (job != QEMU_JOB_ASYNC) {
            priv->job.active = job;
            priv->job.owner = virThreadSelfID();
            priv->job.ownerAPI = virThreadJobGet();
            priv->job.started = now;
        } else {
        	/* 如果要标记的是异步任务,设置priv->job.asyncJob */
            qemuDomainObjResetAsyncJob(&priv->job);
		    ......
            priv->job.asyncJob = asyncJob;
 	        ......
        }
    }
	......

嵌套任务开始

  • 标记迁移任务后,API在调用qmp命令发起迁移前,需要标记异步嵌套任务开始,这个在qemuDomainObjEnterMonitorAsync完成。流程如下:
qemuMigrationSrcRun
	qemuDomainObjEnterMonitorAsync(driver, vm, QEMU_ASYNC_JOB_MIGRATION_OUT) 
		qemuDomainObjEnterMonitorInternal                                    
  			qemuDomainObjBeginNestedJob
  	qemuMonitorMigrateToFd
  	......
  	qemuDomainObjExitMonitor
  • 分析qemuDomainObjEnterMonitorAsync的实现:
qemuDomainObjEnterMonitorAsync(driver, vm, QEMU_ASYNC_JOB_MIGRATION_OUT) 
	qemuDomainObjEnterMonitorInternal(driver, obj, asyncJob)
		/* 如果存在异步任务,进入嵌套异步任务的标记流程 */
		if (asyncJob != QEMU_ASYNC_JOB_NONE) 
			qemuDomainObjBeginNestedJob(driver, obj, asyncJob)
  • 仔细分析qemuDomainObjBeginNestedJob的逻辑:
int
qemuDomainObjBeginNestedJob(virQEMUDriverPtr driver,
                            virDomainObjPtr obj,
                            qemuDomainAsyncJob asyncJob)
{
    qemuDomainObjPrivatePtr priv = obj->privateData;
    /* 如果要发起的嵌套任务和虚机正在执行的任务不同,报错返回 */
    if (asyncJob != priv->job.asyncJob) {
        virReportError(VIR_ERR_INTERNAL_ERROR,
                       _("unexpected async job %d type expected %d"),
                       asyncJob, priv->job.asyncJob);
        return -1;
    }
    /* 通常都是同一个线程在第一次标记异步任务开始后,再次标记嵌套异步任务开始
     * 因此线程ID是一样的,如果不一样,打印告警信息 */
    if (priv->job.asyncOwner != virThreadSelfID()) {
        VIR_WARN("This thread doesn't seem to be the async job owner: %llu",
                 priv->job.asyncOwner);
    }
	/* 如果检查没有问题,进入公共的任务标记流程,这里我们看到传入的同步任务是
	 * QEMU_JOB_ASYNC_NESTED,这个能理解,但异步任务是
	 * QEMU_ASYNC_JOB_NONE,为什么?因为异步任务标记了
	 * 嵌套任务和异步任务的类型一样,这里传入QEMU_ASYNC_JOB_NONE不影响异步任务的标记 */
    return qemuDomainObjBeginJobInternal(driver, obj,
                                         QEMU_JOB_ASYNC_NESTED,
                                         QEMU_AGENT_JOB_NONE,
                                         QEMU_ASYNC_JOB_NONE,
                                         false);
}

任务结束

  • 结束同步任务的核心函数是qemuDomainObjEndJob,如下:
void qemuDomainObjEndJob(virQEMUDriverPtr driver, virDomainObjPtr obj)
{
    qemuDomainObjPrivatePtr priv = obj->privateData;
    qemuDomainJob job = priv->job.active;

    priv->jobs_queued--;
	/* 复位同步任务 */
    qemuDomainObjResetJob(&priv->job);
    if (qemuDomainTrackJob(job))
        qemuDomainObjSaveStatus(driver, obj);
    /* We indeed need to wake up ALL threads waiting because
     * grabbing a job requires checking more variables. */
    /* 唤醒所有等待在同步任务信号量上的线程 */
    virCondBroadcast(&priv->job.cond);
}
 类似资料: