x265默认采用了帧级并行和帧内行级并行两种基础的并行编码。帧级并行即多帧同时编码,帧内行级并行主要利用HEVC的wavefront编码工具来实现,可以同时编码多个CTU行。
x265采用了基于线程池(threadpool)的多线程机制。threadPool中包含了众多的WorkerThread。WorkerThread是独立的线程,是具体干活的线程,线程的主循环函数为WorkerThread::threadMain()。给WorkerThread派活的叫做JobProvider。x265中FrameEncoder、WaveFront和Lookahead都属于JobProvider。JobProvider的findJob()函数是真正的干活过程。
这里可以用甲方、包工头和工人进行一个不恰当的类比。threadpool相当于包工头,而WorkerThread相当于工人,包工头手底下有很多工人。没活干的时候,工人处于休息(Sleep)状态。JobProvider则是甲方,甲方手里有工程时,对包工头(threadpool)提需求(tryWakeOne)及对应的实施方法。包工头(threadpool)找到手下空闲的工人(WorkThread)。然后,工人则根据甲方的实施方法(WaveFront::findJob())开始干活。
对于WPP编码,x265是如何用线程池进行编码的呢?
WPP多线程编码,FrameEncoder是作为甲方(JobProvider)发布需求的。具体过程在FrameEncode::compressFrame()函数中,代码段如下所示。FrameEncoder通过调用tryWakeOne()对threadpool发布需求,但发需求是有条件的。在HEVC编码中,WPP启动一行编码,需要满足两个条件:1、参考帧所参考的区域都已经编码重建;2、上一行的编码进度领先当前行至少2个CTU(为了获取正确的帧内预测参考像素)。x265中用m_externalDependencyBitmap和m_internalDependencyBitmap变量来分别标识这两个条件是否满足。条件1满足,则调用enableRowEncoder()将m_externalDependencyBitmap当前行所对应的位上置为1;条件2满足,则调用enqueueRowEncoder()将m_internalDependencyBitmap当前行所对应的位上置为1。
if (m_param->bEnableWavefront)
{
for (uint32_t rowInSlice = 0; rowInSlice < m_sliceGroupSize; rowInSlice++)
{
for (uint32_t sliceId = 0; sliceId < m_param->maxSlices; sliceId++)
{
const uint32_t sliceStartRow = m_sliceBaseRow[sliceId];
const uint32_t sliceEndRow = m_sliceBaseRow[sliceId + 1] - 1;
const uint32_t row = sliceStartRow + rowInSlice;
X265_CHECK(row < m_numRows, "slices row fault was detected");
if (row > sliceEndRow)
continue;
// block until all reference frames have reconstructed the rows we need
// 等待参考帧中当前行所参考的最大区域都重建好
for (int l = 0; l < numPredDir; l++)
{
for (int ref = 0; ref < slice->m_numRefIdx[l]; ref++)
{
Frame *refpic = slice->m_refFrameList[l][ref];
// NOTE: we unnecessary wait row that beyond current slice boundary
const int rowIdx = X265_MIN(sliceEndRow, (row + m_refLagRows));
while (refpic->m_reconRowFlag[rowIdx].get() == 0)
refpic->m_reconRowFlag[rowIdx].waitForChange(0); // 等待该行重建完成
if ((bUseWeightP || bUseWeightB) && m_mref[l][ref].isWeighted)
m_mref[l][ref].applyWeight(rowIdx, m_numRows, sliceEndRow, sliceId);
}
}
enableRowEncoder(m_row_to_idx[row]); /* clear external dependency for this row,当期行的外部依赖已经满足 */
if (!rowInSlice) // 第0行不依赖帧内其他行,即满足帧内依赖关系
{
m_row0WaitTime = x265_mdate();
enqueueRowEncoder(m_row_to_idx[row]); /* clear internal dependency, start wavefront, 当前行的内部依赖已经满足 */
}
tryWakeOne(); // 对线程池发布一次需求
} // end of loop rowInSlice
} // end of loop sliceId
tryWakeOne(); /* ensure one thread is active or help-wanted flag is set prior to blocking */
// 如果一帧没有编码结束,则每隔250ms给线程池发布一次需求
static const int block_ms = 250;
while (m_completionEvent.timedWait(block_ms))
tryWakeOne();
}
下面是函数tryWakeOne()的相关代码:首先让包工头(m_pool)找手底下有没有空闲的工人,如果所有工人都已经在干活了,则只设置一下m_helpWanted为true就直接返回了,后续甲方每隔250ms还会再来找包工头的;如果有空闲工人,则让该工人对接甲方(JobProvider),并且激活(worker.awaken)他。
void JobProvider::tryWakeOne()
{
// 先让包工头找一下有没有空闲的工人
int id = m_pool->tryAcquireSleepingThread(m_ownerBitmap, ALL_POOL_THREADS);
if (id < 0) // 没有空闲工人,直接返回
{
m_helpWanted = true;
return;
}
WorkerThread& worker = m_pool->m_workers[id]; // 当前空闲的工人
if (worker.m_curJobProvider != this) /* poaching */
{
sleepbitmap_t bit = (sleepbitmap_t)1 << id;
SLEEPBITMAP_AND(&worker.m_curJobProvider->m_ownerBitmap, ~bit);
worker.m_curJobProvider = this; // 将甲方赋值给该工人,方便工人后续按照甲方需求干活
SLEEPBITMAP_OR(&worker.m_curJobProvider->m_ownerBitmap, bit);
}
worker.awaken(); // 激活该工人
}
下面是工人(WorkerThread)的主函数。在工人的主循环函数中,调用甲方的findJob()来干具体的活。干完之后,会主动找下一个需要帮助(m_helpWanted==true)的甲方,继续干活(可称为主动接单模式)。如果没有找到有需求的甲方了,则进入Sleep状态,等待甲方有需求时主动激活(可称为被动接单模式)。
void WorkerThread::threadMain()
{
THREAD_NAME("Worker", m_id);
#if _WIN32
SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_BELOW_NORMAL);
#else
__attribute__((unused)) int val = nice(10);
#endif
m_pool.setCurrentThreadAffinity();
sleepbitmap_t idBit = (sleepbitmap_t)1 << m_id;
m_curJobProvider = m_pool.m_jpTable[0];
m_bondMaster = NULL;
SLEEPBITMAP_OR(&m_curJobProvider->m_ownerBitmap, idBit);
SLEEPBITMAP_OR(&m_pool.m_sleepBitmap, idBit);
m_wakeEvent.wait(); // 等待甲方激活工人
while (m_pool.m_isActive)
{
if (m_bondMaster)
{
m_bondMaster->processTasks(m_id);
m_bondMaster->m_exitedPeerCount.incr();
m_bondMaster = NULL;
}
do
{
/* do pending work for current job provider */
m_curJobProvider->findJob(m_id); // 完成甲方提出的需求
/* if the current job provider still wants help, only switch to a
* higher priority provider (lower slice type). Else take the first
* available job provider with the highest priority */
int curPriority = (m_curJobProvider->m_helpWanted) ? m_curJobProvider->m_sliceType :
INVALID_SLICE_PRIORITY + 1;
// 主动寻找有需求的甲方,继续干活(主动接单模式)
int nextProvider = -1;
for (int i = 0; i < m_pool.m_numProviders; i++)
{
if (m_pool.m_jpTable[i]->m_helpWanted &&
m_pool.m_jpTable[i]->m_sliceType < curPriority)
{
nextProvider = i;
curPriority = m_pool.m_jpTable[i]->m_sliceType;
}
}
if (nextProvider != -1 && m_curJobProvider != m_pool.m_jpTable[nextProvider])
{
SLEEPBITMAP_AND(&m_curJobProvider->m_ownerBitmap, ~idBit);
m_curJobProvider = m_pool.m_jpTable[nextProvider];
SLEEPBITMAP_OR(&m_curJobProvider->m_ownerBitmap, idBit);
}
}
while (m_curJobProvider->m_helpWanted);
/* While the worker sleeps, a job-provider or bond-group may acquire this
* worker's sleep bitmap bit. Once acquired, that thread may modify
* m_bondMaster or m_curJobProvider, then waken the thread */
SLEEPBITMAP_OR(&m_pool.m_sleepBitmap, idBit); // 所有甲方的需求暂时都完成了,进入休息状态
m_wakeEvent.wait(); // 等待甲方有需求时主动激活工人(被动接单模式)
}
SLEEPBITMAP_OR(&m_pool.m_sleepBitmap, idBit);
}
WaveFont作为甲方之一,他的需求具体实施过程在WaveFont::findJob()函数中,代码如下。CTZ(m_internalDependencyBitmap[w] & m_externalDependencyBitmap[w])用于找到当前能编的最上边的CTU Row。processRow()用于处理一行的编码或者滤波。
void WaveFront::findJob(int threadId)
{
unsigned long id;
/* Loop over each word until all available rows are finished */
for (int w = 0; w < m_numWords; w++)
{
// oldval中bit为1的位表示同时满足帧内和帧间依赖的CTU行
uint32_t oldval = m_internalDependencyBitmap[w] & m_externalDependencyBitmap[w];
while (oldval)
{
CTZ(id, oldval); // 取值为1的最低的位,表示最上边的能编的CTU行
uint32_t bit = 1 << id;
if (ATOMIC_AND(&m_internalDependencyBitmap[w], ~bit) & bit) // 清除internalDepencyBitmap中该行的bit
{
/* we cleared the bit, we get to process the row */
processRow(w * 32 + id, threadId); // 编码当前CTU行
m_helpWanted = true;
return; /* check for a higher priority task */
}
oldval = m_internalDependencyBitmap[w] & m_externalDependencyBitmap[w];
}
}
m_helpWanted = false;
}
待续......