当前位置: 首页 > 工具软件 > Mem Reduct > 使用案例 >

CUDA Sample中的reduce实现

盖成弘
2023-12-01

我们知道,GPU擅长做并行计算,像element-wise操作。GEMM, Conv这种不仅结果张量中元素的计算相互不依赖,而且输入数据还会被反复利用的更能体现GPU的优势。但AI模型计算或者HPC中还有一类操作由于元素间有数据依赖,会给并行化带来挑战,那就是reduce操作。它代表一类操作,即将多个元素通过某种特定的运算进行归约。其应用很广泛,很多其它算法也以它为基础,如scan, histogram等操作。

最naive的计算方式是序列化地挨个累加。虽然序列化的实现很简单,但无法获得并行处理器带来的好处。为了得到并行计算的好处,需要将计算分布到多个核上。但就像前面说的,序列化的方式由于会导致每个元素计算时都依赖前面的累加结果,难以完全并行。对于reduce,一个基本思路是尽可能地将部分计算并行,以一种树形的结构来分阶段计算最终结果。

CUDA SDK Sample中的关于reduce的例子展示了如何用CUDA高效地进行计算。其中包含了CUDA中的一些特性的使用及技巧。相关代码在cuda-samples项目的Samples/2_Concepts_and_Techniques/reduction目录。Reduce相关的sample主要散落在下面几个子目录中:

  • reduction
  • reductionMultiBlockCG
  • threadFenceReduction

Reduction

对于reduction这个sample,文件reduction/reduction.cpp中的runTest为程序的主入口函数。如果运行时命令行指定--shmoo参数,则会对于1到32M的数据,执行7种不同的kernel。然后生成csv格式的报告。

./reduction --shmoo

这里采用的是two pass reduction的方法,即分两次pass做全部数据的计算。第一个pass是做CTA内的reduce,然后做CTA的local sum做reduce。这几个kernel主要用于第一个pass的。下面看看这几个reduce kernel的实现:

  • reduce0

这是最基础的版本。它对于n(2的幂)个元素的输入数组,使用n个线程,在log(n)步内完成计算。

首先将输入元素从global memory g_idata移到shared memory sdata,每个线程移一个元素。

// load shared mem                                              
unsigned int tid = threadIdx.x;                                 
unsigned int i = blockIdx.x * blockDim.x + threadIdx.x;         
                                                                
sdata[tid] = (i < n) ? g_idata[i] : 0;                          
                                                                
cg::sync(cta);                                                  

然后用log-step reduction在CTA内部做reduction。

// do reduction in shared mem                               
for (unsigned int s = 1; s < blockDim.x; s *= 2) {          
  // modulo arithmetic is slow!                             
  if ((tid % (2 * s)) == 0) {                               
    sdata[tid] += sdata[tid + s];                           
  }                                                         
                                                            
  cg::sync(cta);                                            
}                                                           

最后由CTA的第一个thread将该CTA的partial sum放到输出元素的对应的位置:

// write result for this block to global mem     
if (tid == 0) g_odata[blockIdx.x] = sdata[0];    

它的问题是使用modulo运算确定哪些thread是active的,而这个操作在GPU上很耗时。另一方面,每个warp都不是100%的thread在工作,效率较低。

  • reduce1

该kernel通过更改数据元素与线程的对应关系,去除了modulo运算。

// do reduction in shared mem                                
for (unsigned int s = 1; s < blockDim.x; s *= 2) {           
  int index = 2 * s * tid;                                   
                                                             
  if (index < blockDim.x) {                                  
    sdata[index] += sdata[index + s];                        
  }                                                          
                                                             
  cg::sync(cta);                                             
}                                                            

它与上一个kernel的区别是其中active的thread是连续的。这样一些warp中的thread是完全利用了,但其缺点是数据访问方式会导致shared memory的bank conflicts。

避免bank conflict是GPU的最重要的性能优化点之一。我们知道,为了提升访存的bandwidth,GPU中将shared memory均匀切成相同大小的bank,不同bank是可以同时访问的,这就使得数据访问可以并行起来。但是,一旦发生了bank conflict,意味着同一bank被同时访问,则只能串行访问。有一种例外是warp中所有线程访问同一元素,则会以广播的方式将数据给到所有线程。在compute capability 5.x后,每个bank在每个clock cycle的bandwidth为32 bit。连续的32 bit对应连续的bank。内存地址与bank index的对应公式为bank index = (byte address ÷ 4 bytes/bank) % 32 banks

  • reduce2
    该kernel将interleaved addressing改为sequential addressing:
// do reduction in shared mem                           
for (unsigned int s = blockDim.x / 2; s > 0; s >>= 1) { 
  if (tid < s) {                                        
    sdata[tid] += sdata[tid + s];                       
  }                                                     
                                                        
  cg::sync(cta);                                        
}                                                       

这样相邻的thread访问的是相邻的数据,从而避免了bank conflict。这个kernel有个问题是在第一次迭代中有一半的线程是空闲的。空闲的硬件就是浪费啊。

  • reduce3

前面提到的实现中每个CTA处理的数据量与CTA的线程数一致,而这个kernel中线程数减半,因此每个线程处理的数据会增大一倍。核心部分代码如下:

unsigned int i = blockIdx.x * (blockDim.x * 2) + threadIdx.x;  

T mySum = (i < n) ? g_idata[i] : 0; 

if (i + blockDim.x < n) mySum += g_idata[i + blockDim.x];           
                                                                    
sdata[tid] = mySum;                                                 
cg::sync(cta);                                                      
                                                                    
// do reduction in shared mem                                       
for (unsigned int s = blockDim.x / 2; s > 0; s >>= 1) {             
  if (tid < s) {                                                    
    sdata[tid] = mySum = mySum + sdata[tid + s];                    
  }                                                                 
                                                                    
  cg::sync(cta);                                                    
}                                                                   
  • reduce4

这个kernel使用了warp级的优化。Warp是CUDA上调度和运行的基本单元。在warp范围内的计算有一些特有的优化机会,如由于warp内部线程是同步的,因此可以免去__syncthreads(),还有关于tid的一些判断所带来的开销。

Warp内优化主要手段之一就是warp shuffle操作(Warp shuffle介绍可参考Using CUDA Warp-Level Primitives)。该kernel主要逻辑如下:

// do reduction in shared mem                                                 
for (unsigned int s = blockDim.x / 2; s > 32; s >>= 1) {                      
  if (tid < s) {                                                              
    sdata[tid] = mySum = mySum + sdata[tid + s];                              
  }                                                                           
                                                                              
  cg::sync(cta);                                                              
}                                                                             
                                                                              
cg::thread_block_tile<32> tile32 = cg::tiled_partition<32>(cta);              
                                                                              
if (cta.thread_rank() < 32) {                                                 
  // Fetch final intermediate sum from 2nd warp                               
  if (blockSize >= 64) mySum += sdata[tid + 32];                              
  // Reduce final warp using shuffle                                          
  for (int offset = tile32.size() / 2; offset > 0; offset /= 2) {             
    mySum += tile32.shfl_down(mySum, offset);                                 
  }                                                                           
}                                                                             

注意前面tree-based reduction到32为止,warp size为32,也就是说剩下的是warp级的reduction计算。Warp级计算中使用shfl_down操作在warp内进行tree-based reduction。

  • reduce5

这个kernel主要减少地址计算和循环开销。要达到这个目的,一个常用的方法就是loop unrolling。实现上比较直观,不作展开了。

// do reduction in shared mem                            
if ((blockSize >= 512) && (tid < 256)) {                 
  sdata[tid] = mySum = mySum + sdata[tid + 256];         
}                                                        
                                                         
cg::sync(cta);                                           
                                                         
if ((blockSize >= 256) && (tid < 128)) {                 
  sdata[tid] = mySum = mySum + sdata[tid + 128];         
}                                                        
                                                         
cg::sync(cta);                                           
                                                         
if ((blockSize >= 128) && (tid < 64)) {                  
  sdata[tid] = mySum = mySum + sdata[tid + 64];          
}                                                        
  • reduce6

前面的kernel中元素数量与线程数相等,或者一倍关系。这个kernel中在每个线程中串行累加多个元素。 我们知道,要让GPU达到更好的性能,有两个维度的考量。一个是occupancy,即尽可能多的warp可以让SM有东西可调度,避免SM空闲。这提升了warp间的并行程度。另一个是让线程内有足够多工作,这样可以有更多指令级并行的机会。每个线程干更多活,意味着更少的线程,也意味着更少的warp和CTA。这相当于以occupancy为代价,提升了指令级并行(Instruction-level parallelism)。遗憾的是两者通常是矛盾的,需要我们在实践中找到一个平衡点。具体一个线程干多少活合适,是需要case-by-case分析的。

这部分逻辑主要体现在下面这部分代码中:

// perform first level of reduction,                                         
// reading from global memory, writing to shared memory                      
unsigned int tid = threadIdx.x;                                              
unsigned int gridSize = blockSize * gridDim.x;                               
                                                                             
T mySum = 0;                                                                 
                                                                             
// we reduce multiple elements per thread.  The number is determined by the  
// number of active thread blocks (via gridDim).  More blocks will result    
// in a larger gridSize and therefore fewer elements per thread              
if (nIsPow2) {                                                               
  unsigned int i = blockIdx.x * blockSize * 2 + threadIdx.x;                 
  gridSize = gridSize << 1;                                                  
                                                                             
  while (i < n) {                                                            
    mySum += g_idata[i];                                                     
    // ensure we don't read out of bounds -- this is optimized away for      
    // powerOf2 sized arrays                                                 
    if ((i + blockSize) < n) {                                               
      mySum += g_idata[i + blockSize];                                       
    }                                                                        
    i += gridSize;                                                           
  }                                                                          
} else {                                                                     
  unsigned int i = blockIdx.x * blockSize + threadIdx.x;                     
  while (i < n) {                                                            
    mySum += g_idata[i];                                                     
    i += gridSize;                                                           
  }                                                                          
}                                                                            

其中的while循环用于在线程中累加多个元素。这里有个小优化,如果元素个数是2的幂,每个迭代可以累加两个元素。

  • reduce7

该kernel的前半部分,即first level reduction与前一kernel是类似的。后面的部分利用了warp级别的优化:

// unsigned int maskLength = (blockSize & 31);  // 31 = warpSize-1    
// maskLength = (maskLength > 0) ? (32 - maskLength) : maskLength;    
// const unsigned int mask = (0xffffffff) >> maskLength;              
                                                       
// Reduce within warp using shuffle or reduce_add if T==int & CUDA_ARCH ==          
// SM 8.0                                                                           
mySum = warpReduceSum<T>(mask, mySum);                                              
                                                                                    
// each thread puts its local sum into shared memory                                
if ((tid % warpSize) == 0) {                                                        
  sdata[tid / warpSize] = mySum;                                                    
}                                                                                   
                                                                                    
__syncthreads();                                                                    
                                                                                    
const unsigned int shmem_extent =                                                   
    (blockSize / warpSize) > 0 ? (blockSize / warpSize) : 1;                        
const unsigned int ballot_result = __ballot_sync(mask, tid < shmem_extent);         
if (tid < shmem_extent) {                                                           
  mySum = sdata[tid];                                                               
  // Reduce final warp using shuffle or reduce_add if T==int & CUDA_ARCH ==         
  // SM 8.0                                                                         
  mySum = warpReduceSum<T>(ballot_result, mySum);                                   
}                                                                                   

其中关键的是warpReduceSum()函数,其作用是做warp内的reduction。在SM 8.0前是用的warp shuffle,SM 8.0后可以用__reduce_add_sync

template <class T>                                                              
__device__ __forceinline__ T warpReduceSum(unsigned int mask, T mySum) {        
  for (int offset = warpSize / 2; offset > 0; offset /= 2) {                    
    mySum += __shfl_down_sync(mask, mySum, offset);                             
  }                                                                             
  return mySum;                                                                 
}                                                                               
                                                                                
#if __CUDA_ARCH__ >= 800                                                        
// Specialize warpReduceFunc for int inputs to use __reduce_add_sync intrinsic  
// when on SM 8.0 or higher                                                     
template <>                                                                     
__device__ __forceinline__ int warpReduceSum<int>(unsigned int mask,            
                                                  int mySum) {                  
  mySum = __reduce_add_sync(mask, mySum);                                       
  return mySum;                                                                 
}                                                                               
#endif                                                                          
  • cg_reduce

Cooperative Groups是CUDA 9引入的特性,用于更灵活地组织与同步线程组(可参考Cooperative Groups: Flexible CUDA Thread Programming)。这个kernel实现利用了cooperative group特性中的thread_block_tile,将CTA分成warp。先在CTA范围作reduction,直到warp级别,最后调用cg_reduce_n做warp级别的reduction。

T threadVal = 0;                                            
{                                                           
  unsigned int i = threadIndex;                             
  unsigned int indexStride = (numCtas * ctaSize);           
  while (i < n) {                                           
    threadVal += g_idata[i];                                
    i += indexStride;                                       
  }                                                         
  sdata[threadRank] = threadVal;                            
}                                                           
                                                            
// Wait for all tiles to finish and reduce within CTA       
{                                                           
  unsigned int ctaSteps = tile.meta_group_size();           
  unsigned int ctaIndex = ctaSize >> 1;                     
  while (ctaIndex >= 32) {                                  
    cta.sync();                                             
    if (threadRank < ctaIndex) {                            
      threadVal += sdata[threadRank + ctaIndex];            
      sdata[threadRank] = threadVal;                        
    }                                                       
    ctaSteps >>= 1;                                         
    ctaIndex >>= 1;                                         
  }                                                         
}                                                           
                                                            
// Shuffle redux instead of smem redux                      
{                                                           
  cta.sync();                                               
  if (tile.meta_group_rank() == 0) {                        
    threadVal = cg_reduce_n(threadVal, tile);               
  }                                                         
}                                                           

这里的处理可以大体分为三段:

  1. 处理超过grid的数据。将要处理的数据减少到一个grid。
  2. 在CTA内做reduction,直到warp级别。
  3. 在warp内做reduction。
  • multi_warp_cg_reduce

该kernel实现将CTA与warp间通过cooperative group又分了一层,称为multi warp group。它主要流程分几段:

  1. 处理超过grid的数据。这部分和reduce6基本一致。
  2. 通过cg_reduce_n函数,做multi warp group内的reduction。
  3. 让CTA的每一个thread做在多个multi warp group做reduction。

reductionMultiBlockCG

该kernel实现在Samples/2_Concepts_and_Techniques/reductionMultiBlockCG/reductionMultiBlockCG.cu文件中。函数call_reduceSinglePassMultiBlockCG是kernel的wrapper函数。它通过cudaLaunchCooperativeKernel函数启动kernel reduceSinglePassMultiBlockCG。其中thread与CTA数量由getNumBlocksAndThreads()函数调用cudaOccupancyMaxPotentialBlockSize()函数确定。

该kernel可在一个kernel调用中对任意size的数据做reduction。它是一个single pass的kernel。

for (int i = grid.thread_rank(); i < n; i += grid.size()) {
  sdata[block.thread_rank()] += g_idata[i];
}

cg::sync(block);

// Reduce each block (called once per block)
reduceBlock(sdata, block);
// Write out the result to global memory
if (block.thread_rank() == 0) {
  g_odata[blockIdx.x] = sdata[0];
}
cg::sync(grid);

if (grid.thread_rank() == 0) {
  for (int block = 1; block < gridDim.x; block++) {
    g_odata[0] += g_odata[block];
  }
}

整体流程大体分几个阶段:

  1. 以grid为stride对全部数据做reduction。结果放在shared memory中。
  2. reduceBlock()函数在每个block内做reduction。计算结果(即每个block的partial sum)放在global memory中。
  3. 整个grid的第一个线程,将每个block的partial sum累加起来,得到最终的值。

threadFenceReduction

该kernel实现在Samples/2_Concepts_and_Techniques/threadFenceReduction_kernel.cu文件中。其中reduce()函数是一个wrapper,启动真正的kernel。这里有multi pass与single pass两个实现:


// This function performs a reduction of the input data multiple times and
// measures the average reduction time.

float benchmarkReduce(int n, int numThreads, int numBlocks, int maxThreads,
                      int maxBlocks, int testIterations, bool multiPass,
                      bool cpuFinalReduction, int cpuFinalThreshold,
                      StopWatchInterface *timer, float *h_odata, float *d_idata,
                      float *d_odata) {
    if (multiPass) {
        // execute the kernel
        reduce(n, numThreads, numBlocks, d_idata, d_odata);
        if (cpuFinalReduction) {
            // sum partial sums from each block on CPU
            // copy result from device to host
            for (int i = 0; i < numBlocks; i++) {
                gpu_result += h_odata[i];
            }
        } else {
            // sum partial block sums on GPU
            int s = numBlocks;
            
            while (s > cpuFinalThreshold) {
                int threads = 0, blocks = 0;
                getNumBlocksAndThreads(s, maxBlocks, maxThreads, blocks, threads);
                reduce(s, threads, blocks, d_odata, d_odata);
                s = s / (threads * 2);
            }
        }
    } else {
        // execute the kernel 
        reduceSinglePass(n, numThreads, numBlocks, d_idata, d_odata);
        
    }
    cudaDeviceSynchronize();
}

对于multi pass的情况,首先调用reduce()函数计算CTA中的partial sum。

template <unsigned int blockSize, bool nIsPow2>
__global__ void reduceMultiPass(const float *g_idata, float *g_odata,
                                unsigned int n) {
  // Handle to thread block group
  cg::thread_block cta = cg::this_thread_block();
  reduceBlocks<blockSize, nIsPow2>(g_idata, g_odata, n, cta);
}


// Wrapper function for kernel launch

extern "C" void reduce(int size, int threads, int blocks, float *d_idata,
                       float *d_odata) {
  dim3 dimBlock(threads, 1, 1);
  dim3 dimGrid(blocks, 1, 1);
  int smemSize =
      (threads <= 32) ? 2 * threads * sizeof(float) : threads * sizeof(float);

  // choose which of the optimized versions of reduction to launch
  if (isPow2(size)) {
    switch (threads) {
      case 512:
        reduceMultiPass<512, true>
            <<<dimGrid, dimBlock, smemSize>>>(d_idata, d_odata, size);
        break;

然后计将partial sum累加成最终的结果。这步如果在CPU上算,比较方便,将前面计算的partial sum从GPU移到CPU,然后累加就完了。如果是GPU上算,则需要再调用reduce()函数做reduction。如果一把做不完,还得继续,从而构成循环。

对于single pass的情况,它通过atomic operation与shared memory实现single pass。即只需一次kernel调用。在device memory中记录哪个CTA已写入partial sum,当所有CTA完成后,由一个CTA执行最后的log-step reduction。

template <unsigned int blockSize, bool nIsPow2>
__global__ void reduceSinglePass(const float *g_idata, float *g_odata,
                                 unsigned int n) {
    //
    // PHASE 1: Process all inputs assigned to this block
    //
    reduceBlocks<blockSize, nIsPow2>(g_idata, g_odata, n, cta);
    
    //
    // PHASE 2: Last block finished will process all partial sums
    //
    if (gridDim.x > 1) {
        // wait until all outstanding memory instructions in this thread are
        // finished
        __threadfence();
        
        // Thread 0 takes a ticket                                             
        if (tid == 0) {                                                        
        unsigned int ticket = atomicInc(&retirementCount, gridDim.x);        
        // If the ticket ID is equal to the number of blocks, we are the last
        // block!                                                            
        amLast = (ticket == gridDim.x - 1);                                  
        
        cg::sync(cta);     
        // The last block sums the results of all other blocks 
        if (amLast) {
            while (i < gridDim.x) {                                
                mySum += g_odata[i];                                 
                i += blockSize;                                      
            }                                                      
                                                             
            reduceBlock<blockSize>(smem, mySum, tid, cta);         
                                                             
            if (tid == 0) {                                        
                g_odata[0] = smem[0];                                
                                                             
                // reset retirement count so that next run succeeds  
                retirementCount = 0;                                 
            }                                                      
        }
    }
}          

extern "C" void reduceSinglePass(int size, int threads, int blocks,
                                 float *d_idata, float *d_odata) {
  dim3 dimBlock(threads, 1, 1);
  dim3 dimGrid(blocks, 1, 1);
  int smemSize = threads * sizeof(float);

  // choose which of the optimized versions of reduction to launch
  if (isPow2(size)) {
    switch (threads) {
      case 512:
        reduceSinglePass<512, true>
            <<<dimGrid, dimBlock, smemSize>>>(d_idata, d_odata, size);
        break;

在第一个阶段中,在CTA范围内算partial sum。第二个阶段,先判断是否CTA大于1个。如果总共就一个CTA,那partial sum就是total sum,到这就完事了。否则的话,先用__threadfence()保证到这时grid中前面的内存事务本线程可见。假设现在有n个CTA。我们只需要一个CTA就行,因此让每个CTA的0号线程拿ticket(通过atomicInc()函数实现),拿到第n张ticket的就是最后一个到这的CTA的0号线程。它会将shared memory中的变量amLast置为true,这样通过判断这个变量就可以只让被选中的CTA干活。这个被选中的CTA先将超过thread block size的数据累加,这样将要处理的数据个数缩小到thread block size以内,再调用reduceBlock这个kernel,对shared memory中的数据进行求和。这里的kernel reduceBlock先将CTA切成warp,在warp内部做累加,最后内CTA的0号线程做CTA范围的累加。该kernel执行完成后,最后由0号线程将最终结果写入到global memory中。

尽管现实中很多时候要计算reduce时,为了易用性,可维护性等因素我们会直接使用一些库,如cub,thrust。但是,学习cuda-sample中的相关实现仍可以帮助我们了解一些CUDA特性的用法,以及理解kernel优化中的一些注意点。

 类似资料: