Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
*.DS_Store
__pycache__
*.so
checkpoints
checkpoints
*benchmark_results
17 changes: 17 additions & 0 deletions torch_harmonics/benchmark/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from torch_harmonics.benchmark.benchmark import (
BenchmarkABC,
BenchmarkResult,
get_benchmarks,
register_benchmark,
)
from torch_harmonics.benchmark.timer import (
CPUTimer,
CUDATimer,
NullTimer,
Timer,
TimerResult,
)

# Import to trigger registration of built-in benchmarks.
import torch_harmonics.benchmark.sht # noqa: F401
import torch_harmonics.benchmark.disco # noqa: F401
6 changes: 6 additions & 0 deletions torch_harmonics/benchmark/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import sys

import torch_harmonics.benchmark # noqa: F401 — triggers benchmark registration
from torch_harmonics.benchmark.run import cli

sys.exit(cli())
141 changes: 141 additions & 0 deletions torch_harmonics/benchmark/benchmark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
import abc
import dataclasses
from collections.abc import Callable
from typing import Self

import torch

from torch_harmonics.benchmark.hardware import get_device
from torch_harmonics.benchmark.timer import (
CPUEventPair,
CPUTimer,
CUDATimer,
NullTimer,
Timer,
TimerResult,
)

TensorDict = dict[str, torch.Tensor]


@dataclasses.dataclass
class BenchmarkResult:
phase: str
device: str
timer: TimerResult
cpu_time: float

def __repr__(self) -> str:
return f"BenchmarkResult(phase={self.phase}, device={self.device}, timer={self.timer}, cpu_time={self.cpu_time})"

def asdict(self) -> dict:
return dataclasses.asdict(self)

def get_logs(self, max_depth: int) -> dict[str, float]:
logs = {"phase": self.phase, "device": self.device, "cpu_time": self.cpu_time}
logs.update(self.timer.get_logs(max_depth=max_depth))
return logs


class BenchmarkABC(abc.ABC):
@classmethod
@abc.abstractmethod
def new(cls: type[Self]) -> Self:
"""
Initialize any state needed for the benchmark.
This will be called once before the benchmark is run.
"""
pass

@classmethod
def _make_timer(cls) -> CUDATimer | CPUTimer:
if torch.cuda.is_available():
return CUDATimer()
return CPUTimer()

@classmethod
def _sync(cls) -> None:
if torch.cuda.is_available():
torch.cuda.synchronize()

@classmethod
def run_forward_benchmark(cls, iters=10, warmup=1) -> BenchmarkResult:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should separate out forward and backward benchmarks like this. Rather, run_instance is free to define a timer.context("forward") and timer.context("backward") block as separate blocks if it so chooses, without being required to do so. That way the backward benchmark can also take advantage of the work from the forward benchmark, instead of repeating it.

I'll refactor the existing benchmarks so they time the backward pass, and remove the "backward" framework infrastructure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this so that you can run forward and backward independently. For example when we implement a new kernel we first implement and optimize the forward pass. In this case, there is no backward defined and we do not want to run that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for explaining - this is already handled by the existing framework. When you implement a new kernel, you write a benchmark with only a forward pass, but you can put it inside a child timer block for "forward". The backward block will not get reported. When you add a backward pass, you can add it into the existing benchmark with the backward child timer. Yes the total benchmark time will change on that commit, but the commit includes the benchmark update, and the forward child timings are still directly comparable before and after.

null_timer = NullTimer()
benchmark = cls.new()
for _ in range(warmup):
benchmark.run_instance_forward(null_timer)
timer = cls._make_timer()
cpu_timer = CPUEventPair()
cpu_timer.record_start()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably we should use with cpu_timer: instead of record_start/record_end.

for _ in range(iters):
with timer:
benchmark.run_instance_forward(timer)
cls._sync()
cpu_timer.record_end()
return BenchmarkResult(
phase="forward",
device=str(get_device()),
timer=timer.result,
cpu_time=cpu_timer.elapsed_time_ms(),
)

@classmethod
def run_backward_benchmark(cls, iters=10, warmup=1) -> BenchmarkResult:
null_timer = NullTimer()
benchmark = cls.new()
benchmark.run_instance_forward(null_timer)
for _ in range(warmup):
benchmark.run_instance_backward(null_timer)
timer = cls._make_timer()
cpu_timer = CPUEventPair()
cpu_timer.record_start()
for _ in range(iters):
with timer:
benchmark.run_instance_backward(timer)
cls._sync()
cpu_timer.record_end()
return BenchmarkResult(
phase="backward",
device=str(get_device()),
timer=timer.result,
cpu_time=cpu_timer.elapsed_time_ms(),
)

@abc.abstractmethod
def run_instance_forward(self: Self, timer: Timer) -> TensorDict:
"""
Run the benchmark in backward pass. This will be called multiple times,
and should return a TensorDict of results.

This must not mutate any state on self, since the same instance may be
used across multiple iterations.
"""
pass

@abc.abstractmethod
def run_instance_backward(self: Self, timer: Timer) -> TensorDict:
"""
Run the benchmark in forward pass. This will be called multiple times,
and should return a TensorDict of results.

This must not mutate any state on self, since the same instance may be
used across multiple iterations.
"""
pass


_BENCHMARKS: dict[str, type[BenchmarkABC]] = {}


def register_benchmark(name: str) -> Callable[[type[BenchmarkABC]], type[BenchmarkABC]]:
def _register(fn: type[BenchmarkABC]) -> type[BenchmarkABC]:
if name in _BENCHMARKS:
raise ValueError(f"Benchmark with name '{name}' is already registered.")
_BENCHMARKS[name] = fn
return fn

return _register


def get_benchmarks() -> dict[str, type[BenchmarkABC]]:
return _BENCHMARKS.copy()
71 changes: 71 additions & 0 deletions torch_harmonics/benchmark/disco.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import abc
from typing import Self, final

import torch

from torch_harmonics.benchmark.benchmark import (
BenchmarkABC,
TensorDict,
register_benchmark,
)
from torch_harmonics.benchmark.hardware import get_device, scale_batch_size
from torch_harmonics.benchmark.timer import Timer
from torch_harmonics.disco import DiscreteContinuousConvS2


class DiscreteContinuousConvS2Benchmark(BenchmarkABC):

@final
def __init__(self, conv: DiscreteContinuousConvS2, x: torch.Tensor):
self.conv = conv
self.x = x

@classmethod
@abc.abstractmethod
def new(cls) -> "DiscreteContinuousConvS2Benchmark": ...

@classmethod
@final
def new_with_shape(
cls: type[Self],
B: int,
in_channels: int,
out_channels: int,
nlat: int,
nlon: int,
kernel_shape: int = 3,
) -> Self:
cls.device = get_device()
conv = DiscreteContinuousConvS2(
in_channels=in_channels,
out_channels=out_channels,
in_shape=(nlat, nlon),
out_shape=(nlat, nlon),
kernel_shape=kernel_shape,
theta_cutoff=None,
optimized_kernel=False,
).to(cls.device)
x = torch.randn(B, in_channels, nlat, nlon, dtype=torch.float32, device=cls.device)
return cls(conv=conv, x=x)

@final
def run_instance_forward(self, timer: Timer) -> TensorDict:
result = self.conv(self.x)
self.output = result
return {"outputs": result.detach()}

@final
def run_instance_backward(self, timer: Timer) -> TensorDict:
g = torch.randn_like(self.output)
self.output.backward(g, retain_graph=True)
return {"gradient": self.x.grad}


@register_benchmark("disco_conv_s2_torch_1deg")
class DiscreteContinuousConvS2TorchBenchmark1Degree(DiscreteContinuousConvS2Benchmark):

@classmethod
def new(cls) -> "DiscreteContinuousConvS2TorchBenchmark1Degree":
return cls.new_with_shape(
B=scale_batch_size(4), in_channels=4, out_channels=4, nlat=180, nlon=360,
)
75 changes: 75 additions & 0 deletions torch_harmonics/benchmark/hardware.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import logging

import torch

logger = logging.getLogger(__name__)

# Batch size scale factors relative to Tesla T4 (the default baseline).
# To add a new GPU, add an entry mapping its device name (as returned by
# torch.cuda.get_device_properties(...).name) to a float scale factor.
# Values > 1.0 mean the GPU is faster than a T4 and can use larger batches;
# values < 1.0 mean it is slower.
_BATCH_SIZE_FACTORS: dict[str, float] = {
"Tesla T4": 1.0,
}

_DEFAULT_BATCH_SIZE_FACTOR = 1.0

_device: torch.device | None = None
_batch_size_override: int | None = None


def set_batch_size(batch_size: int) -> None:
"""Override the batch size used by all benchmarks, bypassing hardware scaling."""
global _batch_size_override
_batch_size_override = batch_size


def set_device(device: str | torch.device) -> None:
"""Override the device used by all benchmarks."""
global _device
_device = torch.device(device)


def get_device() -> torch.device:
if _device is not None:
return _device
if torch.cuda.is_available():
return torch.device("cuda", torch.cuda.current_device())
return torch.device("cpu")


def get_batch_size_factor() -> float:
"""Return a hardware-dependent scale factor for benchmark batch sizes.

Benchmarks define a base batch size tuned for a Tesla T4. This function
returns a multiplier so that benchmarks take a similar wall-clock time
on other hardware. If the batch size is too small, the GPU will not be fully
occupied, and the benchmarks cannot be used to tune performance.

Unknown devices fall back to the T4 default (1.0).
"""
if not torch.cuda.is_available():
return _DEFAULT_BATCH_SIZE_FACTOR
name = torch.cuda.get_device_properties(torch.cuda.current_device()).name
factor = _BATCH_SIZE_FACTORS.get(name)
if factor is None:
logger.warning(
f"Unknown GPU '{name}', using default batch size factor "
f"{_DEFAULT_BATCH_SIZE_FACTOR}. Add an entry to "
f"_BATCH_SIZE_FACTORS in hardware.py to tune for this device."
)
return _DEFAULT_BATCH_SIZE_FACTOR
return factor


def scale_batch_size(base: int) -> int:
"""Return the batch size to use for a benchmark.

If a global override has been set via set_batch_size(), that value is
returned directly. Otherwise the base is scaled by the hardware factor
(tuned relative to a Tesla T4). Always returns at least 1.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit counter-intuitive, I wouldn't expect a helper function scale_batch_size to access globals or do this kind of behavior. We should override at a higher level in the code where it's more appropriate.

"""
if _batch_size_override is not None:
return max(1, _batch_size_override)
return max(1, round(base * get_batch_size_factor()))
Loading
Loading