我们知道,GPU擅长做并行计算,像element-wise操作。GEMM, Conv这种不仅结果张量中元素的计算相互不依赖,而且输入数据还会被反复利用的更能体现GPU的优势。但AI模型计算或者HPC中还有一类操作由于元素间有数据依赖,会给并行化带来挑战,那就是reduce操作。它代表一类操作,即将多个元素通过某种特定的运算进行归约。其应用很广泛,很多其它算法也以它为基础,如scan, histogram等操作。
最naive的计算方式是序列化地挨个累加。虽然序列化的实现很简单,但无法获得并行处理器带来的好处。为了得到并行计算的好处,需要将计算分布到多个核上。但就像前面说的,序列化的方式由于会导致每个元素计算时都依赖前面的累加结果,难以完全并行。对于reduce,一个基本思路是尽可能地将部分计算并行,以一种树形的结构来分阶段计算最终结果。
CUDA SDK Sample中的关于reduce的例子展示了如何用CUDA高效地进行计算。其中包含了CUDA中的一些特性的使用及技巧。相关代码在cuda-samples项目的Samples/2_Concepts_and_Techniques/reduction
目录。Reduce相关的sample主要散落在下面几个子目录中:
reduction
reductionMultiBlockCG
threadFenceReduction
对于reduction
这个sample,文件reduction/reduction.cpp
中的runTest
为程序的主入口函数。如果运行时命令行指定--shmoo
参数,则会对于1到32M的数据,执行7种不同的kernel。然后生成csv格式的报告。
./reduction --shmoo
这里采用的是two pass reduction的方法,即分两次pass做全部数据的计算。第一个pass是做CTA内的reduce,然后做CTA的local sum做reduce。这几个kernel主要用于第一个pass的。下面看看这几个reduce kernel的实现:
这是最基础的版本。它对于n(2的幂)个元素的输入数组,使用n个线程,在log(n)步内完成计算。
首先将输入元素从global memory g_idata
移到shared memory sdata
,每个线程移一个元素。
// load shared mem
unsigned int tid = threadIdx.x;
unsigned int i = blockIdx.x * blockDim.x + threadIdx.x;
sdata[tid] = (i < n) ? g_idata[i] : 0;
cg::sync(cta);
然后用log-step reduction在CTA内部做reduction。
// do reduction in shared mem
for (unsigned int s = 1; s < blockDim.x; s *= 2) {
// modulo arithmetic is slow!
if ((tid % (2 * s)) == 0) {
sdata[tid] += sdata[tid + s];
}
cg::sync(cta);
}
最后由CTA的第一个thread将该CTA的partial sum放到输出元素的对应的位置:
// write result for this block to global mem
if (tid == 0) g_odata[blockIdx.x] = sdata[0];
它的问题是使用modulo运算确定哪些thread是active的,而这个操作在GPU上很耗时。另一方面,每个warp都不是100%的thread在工作,效率较低。
该kernel通过更改数据元素与线程的对应关系,去除了modulo运算。
// do reduction in shared mem
for (unsigned int s = 1; s < blockDim.x; s *= 2) {
int index = 2 * s * tid;
if (index < blockDim.x) {
sdata[index] += sdata[index + s];
}
cg::sync(cta);
}
它与上一个kernel的区别是其中active的thread是连续的。这样一些warp中的thread是完全利用了,但其缺点是数据访问方式会导致shared memory的bank conflicts。
避免bank conflict是GPU的最重要的性能优化点之一。我们知道,为了提升访存的bandwidth,GPU中将shared memory均匀切成相同大小的bank,不同bank是可以同时访问的,这就使得数据访问可以并行起来。但是,一旦发生了bank conflict,意味着同一bank被同时访问,则只能串行访问。有一种例外是warp中所有线程访问同一元素,则会以广播的方式将数据给到所有线程。在compute capability 5.x后,每个bank在每个clock cycle的bandwidth为32 bit。连续的32 bit对应连续的bank。内存地址与bank index的对应公式为bank index = (byte address ÷ 4 bytes/bank) % 32 banks
。
// do reduction in shared mem
for (unsigned int s = blockDim.x / 2; s > 0; s >>= 1) {
if (tid < s) {
sdata[tid] += sdata[tid + s];
}
cg::sync(cta);
}
这样相邻的thread访问的是相邻的数据,从而避免了bank conflict。这个kernel有个问题是在第一次迭代中有一半的线程是空闲的。空闲的硬件就是浪费啊。
前面提到的实现中每个CTA处理的数据量与CTA的线程数一致,而这个kernel中线程数减半,因此每个线程处理的数据会增大一倍。核心部分代码如下:
unsigned int i = blockIdx.x * (blockDim.x * 2) + threadIdx.x;
T mySum = (i < n) ? g_idata[i] : 0;
if (i + blockDim.x < n) mySum += g_idata[i + blockDim.x];
sdata[tid] = mySum;
cg::sync(cta);
// do reduction in shared mem
for (unsigned int s = blockDim.x / 2; s > 0; s >>= 1) {
if (tid < s) {
sdata[tid] = mySum = mySum + sdata[tid + s];
}
cg::sync(cta);
}
这个kernel使用了warp级的优化。Warp是CUDA上调度和运行的基本单元。在warp范围内的计算有一些特有的优化机会,如由于warp内部线程是同步的,因此可以免去__syncthreads()
,还有关于tid的一些判断所带来的开销。
Warp内优化主要手段之一就是warp shuffle操作(Warp shuffle介绍可参考Using CUDA Warp-Level Primitives)。该kernel主要逻辑如下:
// do reduction in shared mem
for (unsigned int s = blockDim.x / 2; s > 32; s >>= 1) {
if (tid < s) {
sdata[tid] = mySum = mySum + sdata[tid + s];
}
cg::sync(cta);
}
cg::thread_block_tile<32> tile32 = cg::tiled_partition<32>(cta);
if (cta.thread_rank() < 32) {
// Fetch final intermediate sum from 2nd warp
if (blockSize >= 64) mySum += sdata[tid + 32];
// Reduce final warp using shuffle
for (int offset = tile32.size() / 2; offset > 0; offset /= 2) {
mySum += tile32.shfl_down(mySum, offset);
}
}
注意前面tree-based reduction到32为止,warp size为32,也就是说剩下的是warp级的reduction计算。Warp级计算中使用shfl_down
操作在warp内进行tree-based reduction。
这个kernel主要减少地址计算和循环开销。要达到这个目的,一个常用的方法就是loop unrolling。实现上比较直观,不作展开了。
// do reduction in shared mem
if ((blockSize >= 512) && (tid < 256)) {
sdata[tid] = mySum = mySum + sdata[tid + 256];
}
cg::sync(cta);
if ((blockSize >= 256) && (tid < 128)) {
sdata[tid] = mySum = mySum + sdata[tid + 128];
}
cg::sync(cta);
if ((blockSize >= 128) && (tid < 64)) {
sdata[tid] = mySum = mySum + sdata[tid + 64];
}
前面的kernel中元素数量与线程数相等,或者一倍关系。这个kernel中在每个线程中串行累加多个元素。 我们知道,要让GPU达到更好的性能,有两个维度的考量。一个是occupancy,即尽可能多的warp可以让SM有东西可调度,避免SM空闲。这提升了warp间的并行程度。另一个是让线程内有足够多工作,这样可以有更多指令级并行的机会。每个线程干更多活,意味着更少的线程,也意味着更少的warp和CTA。这相当于以occupancy为代价,提升了指令级并行(Instruction-level parallelism)。遗憾的是两者通常是矛盾的,需要我们在实践中找到一个平衡点。具体一个线程干多少活合适,是需要case-by-case分析的。
这部分逻辑主要体现在下面这部分代码中:
// perform first level of reduction,
// reading from global memory, writing to shared memory
unsigned int tid = threadIdx.x;
unsigned int gridSize = blockSize * gridDim.x;
T mySum = 0;
// we reduce multiple elements per thread. The number is determined by the
// number of active thread blocks (via gridDim). More blocks will result
// in a larger gridSize and therefore fewer elements per thread
if (nIsPow2) {
unsigned int i = blockIdx.x * blockSize * 2 + threadIdx.x;
gridSize = gridSize << 1;
while (i < n) {
mySum += g_idata[i];
// ensure we don't read out of bounds -- this is optimized away for
// powerOf2 sized arrays
if ((i + blockSize) < n) {
mySum += g_idata[i + blockSize];
}
i += gridSize;
}
} else {
unsigned int i = blockIdx.x * blockSize + threadIdx.x;
while (i < n) {
mySum += g_idata[i];
i += gridSize;
}
}
其中的while循环用于在线程中累加多个元素。这里有个小优化,如果元素个数是2的幂,每个迭代可以累加两个元素。
该kernel的前半部分,即first level reduction与前一kernel是类似的。后面的部分利用了warp级别的优化:
// unsigned int maskLength = (blockSize & 31); // 31 = warpSize-1
// maskLength = (maskLength > 0) ? (32 - maskLength) : maskLength;
// const unsigned int mask = (0xffffffff) >> maskLength;
// Reduce within warp using shuffle or reduce_add if T==int & CUDA_ARCH ==
// SM 8.0
mySum = warpReduceSum<T>(mask, mySum);
// each thread puts its local sum into shared memory
if ((tid % warpSize) == 0) {
sdata[tid / warpSize] = mySum;
}
__syncthreads();
const unsigned int shmem_extent =
(blockSize / warpSize) > 0 ? (blockSize / warpSize) : 1;
const unsigned int ballot_result = __ballot_sync(mask, tid < shmem_extent);
if (tid < shmem_extent) {
mySum = sdata[tid];
// Reduce final warp using shuffle or reduce_add if T==int & CUDA_ARCH ==
// SM 8.0
mySum = warpReduceSum<T>(ballot_result, mySum);
}
其中关键的是warpReduceSum()
函数,其作用是做warp内的reduction。在SM 8.0前是用的warp shuffle,SM 8.0后可以用__reduce_add_sync
。
template <class T>
__device__ __forceinline__ T warpReduceSum(unsigned int mask, T mySum) {
for (int offset = warpSize / 2; offset > 0; offset /= 2) {
mySum += __shfl_down_sync(mask, mySum, offset);
}
return mySum;
}
#if __CUDA_ARCH__ >= 800
// Specialize warpReduceFunc for int inputs to use __reduce_add_sync intrinsic
// when on SM 8.0 or higher
template <>
__device__ __forceinline__ int warpReduceSum<int>(unsigned int mask,
int mySum) {
mySum = __reduce_add_sync(mask, mySum);
return mySum;
}
#endif
Cooperative Groups是CUDA 9引入的特性,用于更灵活地组织与同步线程组(可参考Cooperative Groups: Flexible CUDA Thread Programming)。这个kernel实现利用了cooperative group
特性中的thread_block_tile
,将CTA分成warp。先在CTA范围作reduction,直到warp级别,最后调用cg_reduce_n
做warp级别的reduction。
T threadVal = 0;
{
unsigned int i = threadIndex;
unsigned int indexStride = (numCtas * ctaSize);
while (i < n) {
threadVal += g_idata[i];
i += indexStride;
}
sdata[threadRank] = threadVal;
}
// Wait for all tiles to finish and reduce within CTA
{
unsigned int ctaSteps = tile.meta_group_size();
unsigned int ctaIndex = ctaSize >> 1;
while (ctaIndex >= 32) {
cta.sync();
if (threadRank < ctaIndex) {
threadVal += sdata[threadRank + ctaIndex];
sdata[threadRank] = threadVal;
}
ctaSteps >>= 1;
ctaIndex >>= 1;
}
}
// Shuffle redux instead of smem redux
{
cta.sync();
if (tile.meta_group_rank() == 0) {
threadVal = cg_reduce_n(threadVal, tile);
}
}
这里的处理可以大体分为三段:
该kernel实现将CTA与warp间通过cooperative group又分了一层,称为multi warp group。它主要流程分几段:
cg_reduce_n
函数,做multi warp group内的reduction。该kernel实现在Samples/2_Concepts_and_Techniques/reductionMultiBlockCG/reductionMultiBlockCG.cu
文件中。函数call_reduceSinglePassMultiBlockCG
是kernel的wrapper函数。它通过cudaLaunchCooperativeKernel
函数启动kernel reduceSinglePassMultiBlockCG
。其中thread与CTA数量由getNumBlocksAndThreads()
函数调用cudaOccupancyMaxPotentialBlockSize()
函数确定。
该kernel可在一个kernel调用中对任意size的数据做reduction。它是一个single pass的kernel。
for (int i = grid.thread_rank(); i < n; i += grid.size()) {
sdata[block.thread_rank()] += g_idata[i];
}
cg::sync(block);
// Reduce each block (called once per block)
reduceBlock(sdata, block);
// Write out the result to global memory
if (block.thread_rank() == 0) {
g_odata[blockIdx.x] = sdata[0];
}
cg::sync(grid);
if (grid.thread_rank() == 0) {
for (int block = 1; block < gridDim.x; block++) {
g_odata[0] += g_odata[block];
}
}
整体流程大体分几个阶段:
reduceBlock()
函数在每个block内做reduction。计算结果(即每个block的partial sum)放在global memory中。该kernel实现在Samples/2_Concepts_and_Techniques/threadFenceReduction_kernel.cu
文件中。其中reduce()
函数是一个wrapper,启动真正的kernel。这里有multi pass与single pass两个实现:
// This function performs a reduction of the input data multiple times and
// measures the average reduction time.
float benchmarkReduce(int n, int numThreads, int numBlocks, int maxThreads,
int maxBlocks, int testIterations, bool multiPass,
bool cpuFinalReduction, int cpuFinalThreshold,
StopWatchInterface *timer, float *h_odata, float *d_idata,
float *d_odata) {
if (multiPass) {
// execute the kernel
reduce(n, numThreads, numBlocks, d_idata, d_odata);
if (cpuFinalReduction) {
// sum partial sums from each block on CPU
// copy result from device to host
for (int i = 0; i < numBlocks; i++) {
gpu_result += h_odata[i];
}
} else {
// sum partial block sums on GPU
int s = numBlocks;
while (s > cpuFinalThreshold) {
int threads = 0, blocks = 0;
getNumBlocksAndThreads(s, maxBlocks, maxThreads, blocks, threads);
reduce(s, threads, blocks, d_odata, d_odata);
s = s / (threads * 2);
}
}
} else {
// execute the kernel
reduceSinglePass(n, numThreads, numBlocks, d_idata, d_odata);
}
cudaDeviceSynchronize();
}
对于multi pass的情况,首先调用reduce()
函数计算CTA中的partial sum。
template <unsigned int blockSize, bool nIsPow2>
__global__ void reduceMultiPass(const float *g_idata, float *g_odata,
unsigned int n) {
// Handle to thread block group
cg::thread_block cta = cg::this_thread_block();
reduceBlocks<blockSize, nIsPow2>(g_idata, g_odata, n, cta);
}
// Wrapper function for kernel launch
extern "C" void reduce(int size, int threads, int blocks, float *d_idata,
float *d_odata) {
dim3 dimBlock(threads, 1, 1);
dim3 dimGrid(blocks, 1, 1);
int smemSize =
(threads <= 32) ? 2 * threads * sizeof(float) : threads * sizeof(float);
// choose which of the optimized versions of reduction to launch
if (isPow2(size)) {
switch (threads) {
case 512:
reduceMultiPass<512, true>
<<<dimGrid, dimBlock, smemSize>>>(d_idata, d_odata, size);
break;
然后计将partial sum累加成最终的结果。这步如果在CPU上算,比较方便,将前面计算的partial sum从GPU移到CPU,然后累加就完了。如果是GPU上算,则需要再调用reduce()
函数做reduction。如果一把做不完,还得继续,从而构成循环。
对于single pass的情况,它通过atomic operation与shared memory实现single pass。即只需一次kernel调用。在device memory中记录哪个CTA已写入partial sum,当所有CTA完成后,由一个CTA执行最后的log-step reduction。
template <unsigned int blockSize, bool nIsPow2>
__global__ void reduceSinglePass(const float *g_idata, float *g_odata,
unsigned int n) {
//
// PHASE 1: Process all inputs assigned to this block
//
reduceBlocks<blockSize, nIsPow2>(g_idata, g_odata, n, cta);
//
// PHASE 2: Last block finished will process all partial sums
//
if (gridDim.x > 1) {
// wait until all outstanding memory instructions in this thread are
// finished
__threadfence();
// Thread 0 takes a ticket
if (tid == 0) {
unsigned int ticket = atomicInc(&retirementCount, gridDim.x);
// If the ticket ID is equal to the number of blocks, we are the last
// block!
amLast = (ticket == gridDim.x - 1);
cg::sync(cta);
// The last block sums the results of all other blocks
if (amLast) {
while (i < gridDim.x) {
mySum += g_odata[i];
i += blockSize;
}
reduceBlock<blockSize>(smem, mySum, tid, cta);
if (tid == 0) {
g_odata[0] = smem[0];
// reset retirement count so that next run succeeds
retirementCount = 0;
}
}
}
}
extern "C" void reduceSinglePass(int size, int threads, int blocks,
float *d_idata, float *d_odata) {
dim3 dimBlock(threads, 1, 1);
dim3 dimGrid(blocks, 1, 1);
int smemSize = threads * sizeof(float);
// choose which of the optimized versions of reduction to launch
if (isPow2(size)) {
switch (threads) {
case 512:
reduceSinglePass<512, true>
<<<dimGrid, dimBlock, smemSize>>>(d_idata, d_odata, size);
break;
在第一个阶段中,在CTA范围内算partial sum。第二个阶段,先判断是否CTA大于1个。如果总共就一个CTA,那partial sum就是total sum,到这就完事了。否则的话,先用__threadfence()
保证到这时grid中前面的内存事务本线程可见。假设现在有n个CTA。我们只需要一个CTA就行,因此让每个CTA的0号线程拿ticket(通过atomicInc()
函数实现),拿到第n张ticket的就是最后一个到这的CTA的0号线程。它会将shared memory中的变量amLast
置为true,这样通过判断这个变量就可以只让被选中的CTA干活。这个被选中的CTA先将超过thread block size的数据累加,这样将要处理的数据个数缩小到thread block size以内,再调用reduceBlock
这个kernel,对shared memory中的数据进行求和。这里的kernel reduceBlock
先将CTA切成warp,在warp内部做累加,最后内CTA的0号线程做CTA范围的累加。该kernel执行完成后,最后由0号线程将最终结果写入到global memory中。
尽管现实中很多时候要计算reduce时,为了易用性,可维护性等因素我们会直接使用一些库,如cub,thrust。但是,学习cuda-sample中的相关实现仍可以帮助我们了解一些CUDA特性的用法,以及理解kernel优化中的一些注意点。