NVSHMEMTileReduceBenchmark Class — pytorch Architecture
Architecture documentation for the NVSHMEMTileReduceBenchmark class in bench_nvshmem_tile_reduce.py from the pytorch codebase.
Entity Profile
Source Code
benchmarks/distributed/bench_nvshmem_tile_reduce.py lines 40–182
class NVSHMEMTileReduceBenchmark(MultiProcContinuousTest):
def _init_device(self) -> None:
# TODO: relieve this (seems to hang if without)
device_module.set_device(self.device)
# Set NVSHMEM as SymmMem backend
symm_mem.set_backend("NVSHMEM")
@property
def device(self) -> torch.device:
return torch.device(device_type, self.rank)
def _benchmark_tile_reduce_single(
self,
full_size: int,
tile_size: int,
warmup_iters: int = 5,
bench_iters: int = 10,
) -> dict:
"""
Benchmark a single configuration of tile reduce.
Args:
full_size: Size of the full matrix (full_size x full_size)
warmup_iters: Number of warmup iterations
bench_iters: Number of benchmark iterations
Returns:
Dictionary with benchmark results
"""
self._init_device()
group_name = dist.group.WORLD.group_name
dtype = torch.float
# Allocate full matrices
full_inp = symm_mem.empty(
full_size, full_size, dtype=dtype, device=self.device
).fill_(self.rank)
full_out = symm_mem.empty(
full_size, full_size, dtype=dtype, device=self.device
).fill_(0)
slice_ut = slice(0, tile_size)
inp_tile = full_inp[slice_ut, slice_ut]
out_tile = full_out[slice_ut, slice_ut]
root = 0
# Warmup iterations
for _ in range(warmup_iters):
torch.ops.symm_mem.tile_reduce(inp_tile, out_tile, root, group_name)
torch.cuda.synchronize(self.device)
# Benchmark iterations
times = []
dist.barrier()
torch.cuda.synchronize(self.device)
start_time = time.perf_counter()
for _ in range(bench_iters):
torch.ops.symm_mem.tile_reduce(inp_tile, out_tile, root, group_name)
torch.cuda.synchronize(self.device)
end_time = time.perf_counter()
times.append((end_time - start_time) / bench_iters)
# Calculate statistics
times = torch.tensor(times, dtype=torch.float64)
tile_elements = tile_size * tile_size
tile_bytes = (
tile_elements * dtype.itemsize
if hasattr(dtype, "itemsize")
else tile_elements * 4
)
results = {
"full_size": full_size,
"tile_size": tile_size,
"tile_elements": tile_elements,
"tile_bytes": tile_bytes,
"world_size": self.world_size,
"mean_time_ms": times.mean().item() * 1000,
"std_time_ms": times.std().item() * 1000,
"min_time_ms": times.min().item() * 1000,
"max_time_ms": times.max().item() * 1000,
"throughput_gb_s": tile_bytes / (times.mean().item() * 1e9),
"elements_per_sec": tile_elements / times.mean().item(),
}
return results
@skipIfRocm
def test_benchmark_tile_reduce_various_sizes(self) -> None:
"""
Benchmark tile reduce across various matrix sizes.
"""
# Test various matrix sizes
tile_sizes = [512, 1024, 2048, 4096, 8192, 16384]
full_size = tile_sizes[-1]
warmup_iters = 5
bench_iters = 20
results = []
for tile_size in tile_sizes:
try:
result = self._benchmark_tile_reduce_single(
full_size, tile_size, warmup_iters, bench_iters
)
results.append(result)
if self.rank == 0:
print(
f"Matrix Size: {full_size}x{full_size}, Tile Size: {tile_size}x{tile_size}"
)
print(
f" Mean Time: {result['mean_time_ms']:.3f} ± {result['std_time_ms']:.3f} ms"
)
print(f" Throughput: {result['throughput_gb_s']:.2f} GB/s")
print(f" Bytes: {result['tile_bytes']:.0f}")
print()
except Exception as e:
if self.rank == 0:
print(f"Failed to benchmark matrix size {full_size}: {e}")
# Print summary
if self.rank == 0 and results:
print("=== BENCHMARK SUMMARY ===")
print(
f"{'Matrix Size':<12} {'Tile Size':<10} {'Time (ms)':<12} {'Throughput (GB/s)':<18} {'Bytes':<15}"
)
print("-" * 70)
for result in results:
print(
f"{result['full_size']}x{result['full_size']:<7} "
f"{result['tile_size']}x{result['tile_size']:<5} "
f"{result['mean_time_ms']:<12.3f} "
f"{result['throughput_gb_s']:<18.2f} "
f"{result['tile_bytes']:<15.0f}"
)
Source
Analyze Your Own Codebase
Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.
Try Supermodel Free