Skip to content

Instantly share code, notes, and snippets.

@aspurdy
Last active March 9, 2023 06:56
Show Gist options
  • Select an option

  • Save aspurdy/c2e7f6c2c0c19cc634e4c53c70a3404a to your computer and use it in GitHub Desktop.

Select an option

Save aspurdy/c2e7f6c2c0c19cc634e4c53c70a3404a to your computer and use it in GitHub Desktop.
match_any_sync shim for compute capability < 7.0
// warp-aggregated atomic multi bucket increment based on CUDA samples, modified to support compute capability < 7.0
#include "helper_cuda.h"
#include <cooperative_groups.h>
#include <vector>
#define NUM_ELEMS 10000000
#define NUM_THREADS_PER_BLOCK 512
using namespace cooperative_groups;
// NB could possibly be optimized if the number of values pred can take on is less than warpSize.
__device__ unsigned int match_any_sync(unsigned int active_mask, unsigned int pred, unsigned int lane) {
#if __CUDA_ARCH__ >= 700
return __match_any_sync(active, bucket);
#else
unsigned sub_mask;
// For each thread lane in the warp
for (int i = 0; i < warpSize; ++i) {
// Get thread lane i's value of pred
unsigned int pred_i = __shfl_sync(active_mask, pred, i);
// Get a bit mask representing which threads also share the same value of pred
unsigned pred_i_mask = __ballot_sync(active_mask, (pred_i == pred));
if (i == lane) {
// We've computed the mask for the current lane.
sub_mask = pred_i_mask;
}
}
return sub_mask;
#endif
}
// NB need to keep in mind that due to warp divergence more than one atomic increments might result in a single warp.
// TODO: test under divergent conditions?
__device__ unsigned int warp_aggregated_atomic_bucket_inc(const int bucket, int* counter) {
// Reading special registers costly?
// https://stackoverflow.com/questions/44337309/whats-the-most-efficient-way-to-calculate-the-warp-id-lane-id-in-a-1-d-grid
// https://forums.developer.nvidia.com/t/how-costly-is-the-s2r-instruction-reading-a-special-register/50472#5165296
// unsigned int lane = __popc(active_mask & details::lanemask32_lt());
// Assuming warp size is a power of two
unsigned int lane = threadIdx.x & (warpSize - 1);
// Get a mask of all active threads with same bucket value.
unsigned int sub_mask = match_any_sync(__activemask(), bucket, lane);
// Get the lane of the lowest indexed active thread. This lane will be responsible for atomically incrementing the
// bucket counter.
unsigned int leader_lane = __ffs(sub_mask) - 1;
// Get the rank of the of current thread with respect to the active thread mask.
unsigned int sub_thread_rank = __popc(sub_mask & details::lanemask32_lt());
// The total number of active threads in the warp that share the same bucket value.
unsigned int sub_tile_size = __popc(sub_mask);
int val = 0;
if (lane == leader_lane) {
val = atomicAdd(&counter[bucket], sub_tile_size);
}
// Broadcast previous counter value read by leader to other active threads.
val = __shfl_sync(sub_mask, val, leader_lane);
// Each thread computes its own value based on its thread rank
return val + sub_thread_rank;
}
// Write value indices out to corresponding buckets.
__global__ void map_to_buckets(const int* values, int* bucketed_indices, int* bucket_counters, const int num_values,
const int num_buckets) {
grid_group grid = this_grid();
for (int i = grid.thread_rank(); i < num_values; i += grid.size()) {
const int bucket = values[i];
if (bucket < num_buckets) {
unsigned bucket_index = warp_aggregated_atomic_bucket_inc(bucket, bucket_counters);
bucketed_indices[bucket_index] = i;
}
}
}
int main(int argc, char** argv) {
std::vector<int> host_data(NUM_ELEMS);
const int num_buckets = 5;
// Generate input data.
for (int i = 0; i < NUM_ELEMS; ++i) {
host_data[i] = (unsigned)rand() % num_buckets;
}
int* dev_data;
checkCudaErrors(cudaMalloc(&dev_data, sizeof(int) * NUM_ELEMS));
checkCudaErrors(cudaMemcpy(dev_data, host_data.data(), sizeof(int) * NUM_ELEMS, cudaMemcpyHostToDevice));
int *dev_indices_buckets, *dev_bucket_counters;
std::vector<int> host_bucket_counters(num_buckets);
std::vector<int> cpu_bucket_counters(num_buckets);
for (int i = 0; i < num_buckets; ++i) {
// In this particular example we write out the bucket assigned indices to the same output array, so we need to
// precalculate counter values, correpsonding to offsets into the ouput array, for each bucket based on the
// worst case where all indices get assigned to the same bucket.
host_bucket_counters[i] = i * NUM_ELEMS;
}
// Worse case allocation
checkCudaErrors(cudaMalloc(&dev_indices_buckets, sizeof(int) * NUM_ELEMS * num_buckets));
checkCudaErrors(cudaMalloc(&dev_bucket_counters, sizeof(int) * num_buckets));
checkCudaErrors(cudaMemcpy(dev_bucket_counters, host_bucket_counters.data(), sizeof(int) * num_buckets,
cudaMemcpyHostToDevice));
dim3 dim_block(NUM_THREADS_PER_BLOCK, 1, 1);
dim3 dim_grid((NUM_ELEMS / NUM_THREADS_PER_BLOCK), 1, 1);
map_to_buckets<<<dim_grid, dim_block>>>(dev_data, dev_indices_buckets, dev_bucket_counters, NUM_ELEMS, num_buckets);
checkCudaErrors(cudaMemcpy(host_bucket_counters.data(), dev_bucket_counters, sizeof(int) * num_buckets,
cudaMemcpyDeviceToHost));
for (int i = 0; i < NUM_ELEMS; i++) {
++cpu_bucket_counters[host_data[i]];
}
bool all_match = true;
for (int i = 0; i < num_buckets; i++) {
if (cpu_bucket_counters[i] != host_bucket_counters[i] - i * NUM_ELEMS) {
all_match = false;
break;
}
}
if (all_match) {
printf("Test passed.\n");
} else {
printf("Test failed.\n");
};
checkCudaErrors(cudaFree(dev_data));
checkCudaErrors(cudaFree(dev_bucket_counters));
checkCudaErrors(cudaFree(dev_indices_buckets));
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment