当前位置: 首页 > 工具软件 > x265 > 使用案例 >

x265线程池机制和并行编码机制分析

卢皓轩
2023-12-01

    x265默认采用了帧级并行和帧内行级并行两种基础的并行编码。帧级并行即多帧同时编码,帧内行级并行主要利用HEVC的wavefront编码工具来实现,可以同时编码多个CTU行。

    x265采用了基于线程池(threadpool)的多线程机制。threadPool中包含了众多的WorkerThread。WorkerThread是独立的线程,是具体干活的线程,线程的主循环函数为WorkerThread::threadMain()。给WorkerThread派活的叫做JobProvider。x265中FrameEncoder、WaveFront和Lookahead都属于JobProvider。JobProvider的findJob()函数是真正的干活过程。

    这里可以用甲方、包工头和工人进行一个不恰当的类比。threadpool相当于包工头,而WorkerThread相当于工人,包工头手底下有很多工人。没活干的时候,工人处于休息(Sleep)状态。JobProvider则是甲方,甲方手里有工程时,对包工头(threadpool)提需求(tryWakeOne)及对应的实施方法。包工头(threadpool)找到手下空闲的工人(WorkThread)。然后,工人则根据甲方的实施方法(WaveFront::findJob())开始干活。

    对于WPP编码,x265是如何用线程池进行编码的呢?

  WPP多线程编码,FrameEncoder是作为甲方(JobProvider)发布需求的。具体过程在FrameEncode::compressFrame()函数中,代码段如下所示。FrameEncoder通过调用tryWakeOne()对threadpool发布需求,但发需求是有条件的。在HEVC编码中,WPP启动一行编码,需要满足两个条件:1、参考帧所参考的区域都已经编码重建;2、上一行的编码进度领先当前行至少2个CTU(为了获取正确的帧内预测参考像素)。x265中用m_externalDependencyBitmap和m_internalDependencyBitmap变量来分别标识这两个条件是否满足。条件1满足,则调用enableRowEncoder()将m_externalDependencyBitmap当前行所对应的位上置为1;条件2满足,则调用enqueueRowEncoder()将m_internalDependencyBitmap当前行所对应的位上置为1。

    if (m_param->bEnableWavefront)
    {
        for (uint32_t rowInSlice = 0; rowInSlice < m_sliceGroupSize; rowInSlice++)
        {
            for (uint32_t sliceId = 0; sliceId < m_param->maxSlices; sliceId++)
            {
                const uint32_t sliceStartRow = m_sliceBaseRow[sliceId];
                const uint32_t sliceEndRow = m_sliceBaseRow[sliceId + 1] - 1;
                const uint32_t row = sliceStartRow + rowInSlice;

                X265_CHECK(row < m_numRows, "slices row fault was detected");

                if (row > sliceEndRow)
                    continue;

                // block until all reference frames have reconstructed the rows we need
                // 等待参考帧中当前行所参考的最大区域都重建好
                for (int l = 0; l < numPredDir; l++)
                {
                    for (int ref = 0; ref < slice->m_numRefIdx[l]; ref++)
                    {
                        Frame *refpic = slice->m_refFrameList[l][ref];

                        // NOTE: we unnecessary wait row that beyond current slice boundary
                        const int rowIdx = X265_MIN(sliceEndRow, (row + m_refLagRows));

                        while (refpic->m_reconRowFlag[rowIdx].get() == 0)
                            refpic->m_reconRowFlag[rowIdx].waitForChange(0);  // 等待该行重建完成

                        if ((bUseWeightP || bUseWeightB) && m_mref[l][ref].isWeighted)
                            m_mref[l][ref].applyWeight(rowIdx, m_numRows, sliceEndRow, sliceId);
                    }
                }

                enableRowEncoder(m_row_to_idx[row]); /* clear external dependency for this row,当期行的外部依赖已经满足 */
                if (!rowInSlice)  // 第0行不依赖帧内其他行,即满足帧内依赖关系
                {
                    m_row0WaitTime = x265_mdate();
                    enqueueRowEncoder(m_row_to_idx[row]); /* clear internal dependency, start wavefront, 当前行的内部依赖已经满足 */
                }
                tryWakeOne();    // 对线程池发布一次需求
            } // end of loop rowInSlice
        } // end of loop sliceId


        tryWakeOne(); /* ensure one thread is active or help-wanted flag is set prior to blocking */

        // 如果一帧没有编码结束,则每隔250ms给线程池发布一次需求
        static const int block_ms = 250;
        while (m_completionEvent.timedWait(block_ms))
            tryWakeOne();
    }

    下面是函数tryWakeOne()的相关代码:首先让包工头(m_pool)找手底下有没有空闲的工人,如果所有工人都已经在干活了,则只设置一下m_helpWanted为true就直接返回了,后续甲方每隔250ms还会再来找包工头的;如果有空闲工人,则让该工人对接甲方(JobProvider),并且激活(worker.awaken)他。

void JobProvider::tryWakeOne()
{
    // 先让包工头找一下有没有空闲的工人
    int id = m_pool->tryAcquireSleepingThread(m_ownerBitmap, ALL_POOL_THREADS);
    if (id < 0) // 没有空闲工人,直接返回
    {
        m_helpWanted = true;
        return;
    }

    WorkerThread& worker = m_pool->m_workers[id];  // 当前空闲的工人
    if (worker.m_curJobProvider != this) /* poaching */
    {
        sleepbitmap_t bit = (sleepbitmap_t)1 << id;
        SLEEPBITMAP_AND(&worker.m_curJobProvider->m_ownerBitmap, ~bit);
        worker.m_curJobProvider = this;  // 将甲方赋值给该工人,方便工人后续按照甲方需求干活
        SLEEPBITMAP_OR(&worker.m_curJobProvider->m_ownerBitmap, bit);
    }
    worker.awaken();  // 激活该工人
}

    下面是工人(WorkerThread)的主函数。在工人的主循环函数中,调用甲方的findJob()来干具体的活。干完之后,会主动找下一个需要帮助(m_helpWanted==true)的甲方,继续干活(可称为主动接单模式)。如果没有找到有需求的甲方了,则进入Sleep状态,等待甲方有需求时主动激活(可称为被动接单模式)。

void WorkerThread::threadMain()
{
    THREAD_NAME("Worker", m_id);

#if _WIN32
    SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_BELOW_NORMAL);
#else
    __attribute__((unused)) int val = nice(10);
#endif

    m_pool.setCurrentThreadAffinity();

    sleepbitmap_t idBit = (sleepbitmap_t)1 << m_id;
    m_curJobProvider = m_pool.m_jpTable[0];
    m_bondMaster = NULL;

    SLEEPBITMAP_OR(&m_curJobProvider->m_ownerBitmap, idBit);
    SLEEPBITMAP_OR(&m_pool.m_sleepBitmap, idBit);
    m_wakeEvent.wait();  // 等待甲方激活工人

    while (m_pool.m_isActive)
    {
        if (m_bondMaster)
        {
            m_bondMaster->processTasks(m_id);
            m_bondMaster->m_exitedPeerCount.incr();
            m_bondMaster = NULL;
        }

        do
        {
            /* do pending work for current job provider */
            m_curJobProvider->findJob(m_id);  // 完成甲方提出的需求

            /* if the current job provider still wants help, only switch to a
             * higher priority provider (lower slice type). Else take the first
             * available job provider with the highest priority */
            int curPriority = (m_curJobProvider->m_helpWanted) ? m_curJobProvider->m_sliceType :
                                                                 INVALID_SLICE_PRIORITY + 1;
            // 主动寻找有需求的甲方,继续干活(主动接单模式)
            int nextProvider = -1;
            for (int i = 0; i < m_pool.m_numProviders; i++)
            {
                if (m_pool.m_jpTable[i]->m_helpWanted &&
                    m_pool.m_jpTable[i]->m_sliceType < curPriority)
                {
                    nextProvider = i;
                    curPriority = m_pool.m_jpTable[i]->m_sliceType;
                }
            }
            if (nextProvider != -1 && m_curJobProvider != m_pool.m_jpTable[nextProvider])
            {
                SLEEPBITMAP_AND(&m_curJobProvider->m_ownerBitmap, ~idBit);
                m_curJobProvider = m_pool.m_jpTable[nextProvider];
                SLEEPBITMAP_OR(&m_curJobProvider->m_ownerBitmap, idBit);
            }
        }
        while (m_curJobProvider->m_helpWanted);

        /* While the worker sleeps, a job-provider or bond-group may acquire this
         * worker's sleep bitmap bit. Once acquired, that thread may modify 
         * m_bondMaster or m_curJobProvider, then waken the thread */
        SLEEPBITMAP_OR(&m_pool.m_sleepBitmap, idBit); // 所有甲方的需求暂时都完成了,进入休息状态
        m_wakeEvent.wait();  // 等待甲方有需求时主动激活工人(被动接单模式)
    }

    SLEEPBITMAP_OR(&m_pool.m_sleepBitmap, idBit);
}

    WaveFont作为甲方之一,他的需求具体实施过程在WaveFont::findJob()函数中,代码如下。CTZ(m_internalDependencyBitmap[w] & m_externalDependencyBitmap[w])用于找到当前能编的最上边的CTU Row。processRow()用于处理一行的编码或者滤波。

void WaveFront::findJob(int threadId)
{
    unsigned long id;

    /* Loop over each word until all available rows are finished */
    for (int w = 0; w < m_numWords; w++)
    {
        // oldval中bit为1的位表示同时满足帧内和帧间依赖的CTU行
        uint32_t oldval = m_internalDependencyBitmap[w] & m_externalDependencyBitmap[w];
        while (oldval)
        {
            CTZ(id, oldval);  // 取值为1的最低的位,表示最上边的能编的CTU行

            uint32_t bit = 1 << id;
            if (ATOMIC_AND(&m_internalDependencyBitmap[w], ~bit) & bit) // 清除internalDepencyBitmap中该行的bit
            {
                /* we cleared the bit, we get to process the row */
                processRow(w * 32 + id, threadId);  // 编码当前CTU行
                m_helpWanted = true;
                return; /* check for a higher priority task */
            }

            oldval = m_internalDependencyBitmap[w] & m_externalDependencyBitmap[w];
        }
    }

    m_helpWanted = false;
}

待续......

 类似资料: