Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 112 additions & 2 deletions fbgemm_gpu/bench/repeat_arange_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@
CUDA (optimized): 0.128 ms
Speedup: 4.78x
Throughput: 389.2 M elements/sec

Hardened changes (matching tritonbench port):
- Added input validation for PackedTensorAccessor32 int32 overflow
- Added --export-trace flag for Kineto trace export
- Documented that the PyTorch "reference" uses fbgemm.asynchronous_complete_cumsum
"""

from __future__ import annotations
Expand All @@ -92,6 +97,45 @@
torch.ops.load_library("//deeplearning/fbgemm/fbgemm_gpu:sparse_ops")


# ──────────────────────────────────────────────────────────────────────
# Input validation (backported from tritonbench RepeatArangeBenchmark)
# ──────────────────────────────────────────────────────────────────────

INT32_MAX: int = 2**31 - 1


def _validate_inputs(batch_size: int, max_length: int) -> None:
"""Validate inputs before allocation.

The CUDA kernel ``repeat_arange_kernel`` uses PackedTensorAccessor32
(int32 indexing) for both the lengths input and the output tensor.
We must ensure that:
1. batch_size fits in int32.
2. Worst-case output numel (batch_size * max_length) fits in int32.

This validation was added to match the tritonbench port
(``RepeatArangeBenchmark._validate_inputs``).
"""
if batch_size < 1:
raise ValueError(f"batch_size must be >= 1, got {batch_size}.")
if max_length < 1:
raise ValueError(f"max_length must be >= 1, got {max_length}.")

if batch_size > INT32_MAX:
raise ValueError(
f"batch_size = {batch_size} exceeds int32 max ({INT32_MAX}). "
f"The CUDA kernel uses PackedTensorAccessor32. Reduce batch_size."
)

worst_case_output = batch_size * max_length
if worst_case_output > INT32_MAX:
raise ValueError(
f"Worst-case output numel = batch_size * max_length = "
f"{batch_size} * {max_length} = {worst_case_output} exceeds "
f"int32 max ({INT32_MAX}). Reduce batch_size or max_length."
)


def repeat_arange_pytorch(lengths: torch.Tensor) -> torch.Tensor:
"""
Reference PyTorch implementation.
Expand All @@ -103,6 +147,9 @@ def repeat_arange_pytorch(lengths: torch.Tensor) -> torch.Tensor:
4. subtraction (1 kernel)

Plus multiple intermediate tensor allocations.

NOTE: This calls torch.ops.fbgemm.asynchronous_complete_cumsum — it is
NOT a pure PyTorch reference. It requires fbgemm to be loaded.
"""
offsets = torch.ops.fbgemm.asynchronous_complete_cumsum(lengths)
offsets_without_last, max_len = offsets[:-1], int(offsets[-1])
Expand All @@ -126,6 +173,7 @@ def benchmark_repeat_arange(
dtype: torch.dtype = torch.int64,
device: str = "cuda",
iters: int = 100,
export_trace: bool = False,
) -> None:
"""
Benchmark repeat_arange for a given configuration.
Expand All @@ -136,7 +184,11 @@ def benchmark_repeat_arange(
dtype: Data type for lengths tensor
device: Device to run benchmark on
iters: Number of iterations for timing
export_trace: If True, export a Kineto trace for this configuration
"""
# Validate inputs before allocation (backported from tritonbench port)
_validate_inputs(batch_size, max_length)

# Generate random lengths
if device == "cuda" and not torch.cuda.is_available():
logger.warning("CUDA not available, skipping GPU benchmark")
Expand Down Expand Up @@ -189,6 +241,40 @@ def benchmark_repeat_arange(
print(f"Throughput (CUDA): {total_elements / time_cuda / 1e6:.2f} M elements/sec")
print(f"{'=' * 70}\n")

# Export Kineto trace if requested
if export_trace:
trace_name = f"repeat_arange_B{batch_size}_L{max_length}"
_export_kineto_trace(lengths, trace_name, device)


def _export_kineto_trace(lengths: torch.Tensor, trace_name: str, device: str) -> None:
"""Export a Kineto trace for the given configuration.

Runs both implementations under torch.profiler and saves to JSON files
that can be loaded in chrome://tracing or Perfetto.
"""
activities = [torch.profiler.ProfilerActivity.CPU] # pyre-fixme[16]
if device != "cpu" and torch.cuda.is_available():
activities.append(torch.profiler.ProfilerActivity.CUDA) # pyre-fixme[16]

impls = [("pytorch", repeat_arange_pytorch)]
if device != "cpu":
impls.append(("cuda", repeat_arange_cuda))

for impl_name, impl_fn in impls:
with torch.profiler.profile(
activities=activities,
record_shapes=True,
) as prof:
for _ in range(3):
impl_fn(lengths)
if device != "cpu" and torch.cuda.is_available():
torch.cuda.synchronize()

filename = f"{trace_name}_{impl_name}.json"
prof.export_chrome_trace(filename)
print(f"Exported Kineto trace: {filename}")


@click.group()
def cli() -> None:
Expand Down Expand Up @@ -221,12 +307,18 @@ def cli() -> None:
default=False,
help="Use manual seed for reproduction.",
)
@click.option(
"--export-trace/--no-export-trace",
default=False,
help="Export Kineto profiler traces for each configuration.",
)
def bench_repeat_arange(
batch_sizes: str,
max_lengths: str,
device: str,
iters: int,
manual_seed: bool,
export_trace: bool,
) -> None:
"""
Run repeat_arange benchmarks across different configurations.
Expand All @@ -252,6 +344,7 @@ def bench_repeat_arange(
max_length=max_length,
device=device,
iters=iters,
export_trace=export_trace,
)


Expand All @@ -266,7 +359,14 @@ def bench_repeat_arange(
default=False,
help="Use manual seed for reproduction.",
)
def bench_repeat_arange_quick(device: str, manual_seed: bool) -> None:
@click.option(
"--export-trace/--no-export-trace",
default=False,
help="Export Kineto profiler traces for each configuration.",
)
def bench_repeat_arange_quick(
device: str, manual_seed: bool, export_trace: bool
) -> None:
"""
Quick benchmark with representative configurations.
"""
Expand All @@ -292,6 +392,7 @@ def bench_repeat_arange_quick(device: str, manual_seed: bool) -> None:
max_length=max_length,
device=device,
iters=100,
export_trace=export_trace,
)


Expand All @@ -306,7 +407,14 @@ def bench_repeat_arange_quick(device: str, manual_seed: bool) -> None:
default=False,
help="Use manual seed for reproduction.",
)
def bench_repeat_arange_scaling(device: str, manual_seed: bool) -> None:
@click.option(
"--export-trace/--no-export-trace",
default=False,
help="Export Kineto profiler traces for each configuration.",
)
def bench_repeat_arange_scaling(
device: str, manual_seed: bool, export_trace: bool
) -> None:
"""
Benchmark scaling behavior with increasing batch sizes.
"""
Expand All @@ -326,6 +434,7 @@ def bench_repeat_arange_scaling(device: str, manual_seed: bool) -> None:
max_length=100,
device=device,
iters=50,
export_trace=export_trace,
)

# Test scaling with sequence length
Expand All @@ -336,6 +445,7 @@ def bench_repeat_arange_scaling(device: str, manual_seed: bool) -> None:
max_length=max_length,
device=device,
iters=50,
export_trace=export_trace,
)


Expand Down
Loading