Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ on:
- main
- dev
pull_request:
branches:
- main
- dev

jobs:
unit-tests:
Expand Down
13 changes: 13 additions & 0 deletions examples/matmul_py/task.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,19 @@ files:
- {"name": "reference.py", "source": "reference.py"}
- {"name": "eval.py", "source": "../eval.py"}

milestones:
- {
name: "pytorch",
source: "submission.py",
description: "PyTorch reference implementation as a performance baseline for matmul"
}
- {
name: "triton",
source: "triton_ref.py",
description: "Triton reference implementation as a performance baseline for matmul",
exclude_gpus: ['T4']
}

lang: "py"

description: |
Expand Down
127 changes: 127 additions & 0 deletions examples/matmul_py/triton_ref.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
#!POPCORN leaderboard matmul_py
import triton
import triton.language as tl
import torch
from task import input_t, output_t


@triton.jit
def matmul_kernel(
# Pointers to matrices
a_ptr, b_ptr, c_ptr,
# Matrix dimensions
M, N, K,
# The stride variables represent how much to increase the ptr by when moving by 1
# element in a particular dimension. E.g. `stride_am` is how much to increase `a_ptr`
# by to get the element one row down (A has M rows).
stride_am, stride_ak,
stride_bk, stride_bn,
stride_cm, stride_cn,
# Meta-parameters
BLOCK_SIZE_M: tl.constexpr, BLOCK_SIZE_N: tl.constexpr, BLOCK_SIZE_K: tl.constexpr,
GROUP_SIZE_M: tl.constexpr,
):
"""Kernel for computing the matmul C = A x B.
A has shape (M, K), B has shape (K, N) and C has shape (M, N)
"""
# -----------------------------------------------------------
# Map program ids `pid` to the block of C it should compute.
# This is done in a grouped ordering to promote L2 cache hit rates.
# See above `L2 Cache Optimizations` section for details.
pid = tl.program_id(axis=0)
num_pid_m = tl.cdiv(M, BLOCK_SIZE_M)
num_pid_n = tl.cdiv(N, BLOCK_SIZE_N)
num_pid_in_group = GROUP_SIZE_M * num_pid_n
group_id = pid // num_pid_in_group
first_pid_m = group_id * GROUP_SIZE_M
group_size_m = min(num_pid_m - first_pid_m, GROUP_SIZE_M)
pid_m = first_pid_m + (pid % group_size_m)
pid_n = (pid % num_pid_in_group) // group_size_m

# ----------------------------------------------------------
# Create pointers for the first blocks of A and B.
# We will advance this pointer as we move in the K direction
# and accumulate
# `a_ptrs` is a block of [BLOCK_SIZE_M, BLOCK_SIZE_K] pointers
# `b_ptrs` is a block of [BLOCK_SIZE_K, BLOCK_SIZE_N] pointers
# See above `Pointer Arithmetic` section for details
offs_am = (pid_m * BLOCK_SIZE_M + tl.arange(0, BLOCK_SIZE_M)) % M
offs_bn = (pid_n * BLOCK_SIZE_N + tl.arange(0, BLOCK_SIZE_N)) % N
offs_k = tl.arange(0, BLOCK_SIZE_K)
a_ptrs = a_ptr + (offs_am[:, None] * stride_am + offs_k[None, :] * stride_ak)
b_ptrs = b_ptr + (offs_k[:, None] * stride_bk + offs_bn[None, :] * stride_bn)

# -----------------------------------------------------------
# Iterate to compute a block of the C matrix.
# We accumulate into a `[BLOCK_SIZE_M, BLOCK_SIZE_N]` block
# of fp32 values for higher precision.
# `accumulator` will be converted back to fp16 after the loop.
accumulator = tl.zeros((BLOCK_SIZE_M, BLOCK_SIZE_N), dtype=tl.float32)
for k in range(0, tl.cdiv(K, BLOCK_SIZE_K)):
# Load the next block of A and B, generate a mask by checking the K dimension.
# If it is out of bounds, set it to 0.
a = tl.load(a_ptrs, mask=offs_k[None, :] < K - k * BLOCK_SIZE_K, other=0.0)
b = tl.load(b_ptrs, mask=offs_k[:, None] < K - k * BLOCK_SIZE_K, other=0.0)
# We accumulate along the K dimension.
accumulator += tl.dot(a, b)
# Advance the ptrs to the next K block.
a_ptrs += BLOCK_SIZE_K * stride_ak
b_ptrs += BLOCK_SIZE_K * stride_bk
# You can fuse arbitrary activation functions here
# while the accumulator is still in FP32!
c = accumulator.to(tl.float16)

# -----------------------------------------------------------
# Write back the block of the output matrix C with masks.
offs_cm = pid_m * BLOCK_SIZE_M + tl.arange(0, BLOCK_SIZE_M)
offs_cn = pid_n * BLOCK_SIZE_N + tl.arange(0, BLOCK_SIZE_N)
c_ptrs = c_ptr + stride_cm * offs_cm[:, None] + stride_cn * offs_cn[None, :]
c_mask = (offs_cm[:, None] < M) & (offs_cn[None, :] < N)
tl.store(c_ptrs, c, mask=c_mask)


def triton_matmul(a, b):
# Check constraints.
assert a.shape[1] == b.shape[0], "Incompatible dimensions"
assert a.is_contiguous(), "Matrix A must be contiguous"
assert b.is_contiguous(), "Matrix B must be contiguous"
M, K = a.shape
K, N = b.shape
# Allocate output.
c = torch.empty((M, N), device=a.device, dtype=a.dtype)
# 1D launch kernel where each block gets its own program.
grid = lambda META: (triton.cdiv(M, META['BLOCK_SIZE_M']) * triton.cdiv(N, META['BLOCK_SIZE_N']), )
matmul_kernel[grid](
a, b, c,
M, N, K,
a.stride(0), a.stride(1),
b.stride(0), b.stride(1),
c.stride(0), c.stride(1),
BLOCK_SIZE_M=128, BLOCK_SIZE_N=128, BLOCK_SIZE_K=32,
GROUP_SIZE_M=8,
)
return c


def custom_kernel(data: input_t) -> output_t:
a, b = data
# Convert to torch tensors if they aren't already
if not isinstance(a, torch.Tensor):
a = torch.tensor(a, dtype=torch.float16).cuda()
if not isinstance(b, torch.Tensor):
b = torch.tensor(b, dtype=torch.float16).cuda()

# Ensure tensors are on GPU and contiguous
if not a.is_cuda:
a = a.cuda()
if not b.is_cuda:
b = b.cuda()

a = a.contiguous()
b = b.contiguous()

# Use our custom Triton matmul
result = triton_matmul(a, b)

# Convert back to the expected output format
return result
146 changes: 135 additions & 11 deletions src/kernelbot/cogs/admin_cog.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import json
import subprocess
import tempfile
Expand All @@ -19,9 +20,9 @@
)
from kernelbot.env import env
from kernelbot.ui.misc import ConfirmationView, DeleteConfirmationModal, GPUSelectionView
from libkernelbot.consts import GitHubGPU, ModalGPU
from libkernelbot.consts import GitHubGPU, ModalGPU, get_gpu_by_name
from libkernelbot.leaderboard_db import LeaderboardDoesNotExist, LeaderboardItem, SubmissionItem
from libkernelbot.task import LeaderboardDefinition, make_task_definition
from libkernelbot.task import LeaderboardDefinition, LeaderboardTask, make_task_definition
from libkernelbot.utils import (
KernelBotError,
setup_logging,
Expand Down Expand Up @@ -122,6 +123,10 @@ def __init__(self, bot: "ClusterBot"):
name="set-forum-ids", description="Sets forum IDs"
)(self.set_forum_ids)

self.trigger_milestones = bot.admin_group.command(
name="trigger-milestones", description="Trigger running of milestones"
)(self.trigger_milestones)

self._scheduled_cleanup_temp_users.start()

# --------------------------------------------------------------------------
Expand Down Expand Up @@ -162,6 +167,7 @@ async def leaderboard_create_local(
interaction: discord.Interaction,
directory: str,
gpu: Optional[app_commands.Choice[str]],
milestones: Optional[bool] = False,
):
is_admin = await self.admin_check(interaction)
if not is_admin:
Expand All @@ -180,20 +186,19 @@ async def leaderboard_create_local(
leaderboard_name = directory.name + "-dev"

# create-local overwrites existing leaderboard
forum_channel = self.bot.get_channel(self.bot.leaderboard_forum_id)
forum_thread = None

with self.bot.leaderboard_db as db:
try:
old_lb = db.get_leaderboard(leaderboard_name)
forum_id = old_lb["forum_id"]
forum_thread = await self.bot.fetch_channel(forum_id)
except LeaderboardDoesNotExist:
pass
db.delete_leaderboard(leaderboard_name, force=True)

# get existing forum thread or create new one
forum_channel = self.bot.get_channel(self.bot.leaderboard_forum_id)
forum_thread = None
if old_lb:
forum_id = old_lb["forum_id"]
forum_thread = await self.bot.fetch_channel(forum_id)

# create new forum thread if none exists
if forum_thread is None:
forum_thread = await forum_channel.create_thread(
name=leaderboard_name,
Expand All @@ -216,6 +221,11 @@ async def leaderboard_create_local(
interaction,
f"Leaderboard '{leaderboard_name}' created.",
)
else:
raise KernelBotError(f"Error creating leaderboard '{leaderboard_name}'")

if milestones:
await self._submit_milestones(interaction, leaderboard_name)

def _parse_deadline(self, deadline: str):
# Try parsing with time first
Expand Down Expand Up @@ -354,14 +364,24 @@ async def create_leaderboard_in_db(

with self.bot.leaderboard_db as db:
try:
db.create_leaderboard(
lb_id = db.create_leaderboard(
name=leaderboard_name,
deadline=date_value,
definition=definition,
gpu_types=selected_gpus,
creator_id=interaction.user.id,
forum_id=forum_id,
)

# create entry in milestones table.
for milestone in definition.milestones:
db.create_milestone(
lb_id,
milestone.name,
milestone.code,
description=milestone.description,
exclude_gpus=milestone.exclude_gpus,
)
except KernelBotError as e:
await send_discord_message(
interaction,
Expand All @@ -371,6 +391,95 @@ async def create_leaderboard_in_db(
return False
return True

async def _submit_milestones(
self, interaction: discord.Interaction, leaderboard_name: str, gpus: Optional[list] = None
):
backend = self.bot.backend

with self.bot.leaderboard_db as db:
leaderboard_item = db.get_leaderboard(leaderboard_name)
milestones = db.get_leaderboard_milestones(leaderboard_item["id"])

task: LeaderboardTask = leaderboard_item["task"]

# ok, submit all that are missing
submit_tasks = []
from kernelbot.discord_reporter import MultiProgressReporterDiscord

reporters = MultiProgressReporterDiscord(interaction)
await reporters.show(f"Milestone runs for {leaderboard_name}")

if gpus is None:
gpus = leaderboard_item["gpu_types"]

for milestone in milestones:
with backend.db as db:
existing_runs = db.get_runs_generic(milestone_id=milestone["id"])
# create tasks
for gpu in gpus:
if gpu in [r["runner"] for r in existing_runs]:
await send_discord_message(
interaction,
f"Skipping {gpu} for {milestone['name']}; milestone run already exists.",
ephemeral=True,
)
continue

if gpu in milestone["exclude_gpus"]:
await send_discord_message(
interaction,
f"Skipping {gpu} for {milestone['name']}; is excluded.",
ephemeral=True,
)
continue

submit_tasks.append(
backend.submit_milestone_run(
milestone,
task,
get_gpu_by_name(gpu),
reporters.add_run(f"Milestone {milestone['name']} on {gpu}"),
)
)

await send_discord_message(
interaction,
f"Submitted {len(submit_tasks)} milestone runs for {len(milestones)} milestones.",
ephemeral=True,
)

# Execute all milestone submissions
await asyncio.gather(*submit_tasks)

@app_commands.describe(
leaderboard_name="Name of Leaderboard",
gpu="Select GPU. Leave empty to run for all GPUs.",
rerun="Force re-running existing milestones.",
)
@app_commands.autocomplete(leaderboard_name=leaderboard_name_autocomplete)
@with_error_handling
async def trigger_milestones(
self,
interaction: discord.Interaction,
leaderboard_name: str,
gpu: Optional[str],
rerun: Optional[bool] = False,
):
if not await self.admin_check(interaction):
await send_discord_message(
interaction, "You do not have permission to trigger milestones.", ephemeral=True
)
return

if rerun:
if gpu is not None:
raise KernelBotError("Cannot specify `rerun` and `gpu` at the same time")
with self.bot.backend.db as db:
db.delete_milestone_runs(db.get_leaderboard_id(leaderboard_name))

await interaction.response.defer(ephemeral=True)
await self._submit_milestones(interaction, leaderboard_name, gpus=gpu)

@discord.app_commands.describe(leaderboard_name="Name of the leaderboard")
@discord.app_commands.autocomplete(leaderboard_name=leaderboard_name_autocomplete)
@with_error_handling
Expand Down Expand Up @@ -698,7 +807,7 @@ async def _create_update_plan( # noqa: C901

return update_list, create_list

async def update_competition(
async def update_competition( # noqa: C901
self, interaction: discord.Interaction, spec_file: Path, force: bool = False
):
try:
Expand Down Expand Up @@ -748,6 +857,21 @@ async def update_competition(
entry["name"], self._parse_deadline(entry["deadline"]), task
)
new_lb: LeaderboardItem = db.get_leaderboard(entry["name"])
# delete old milestones
db.delete_milestones(new_lb["id"])
# and (re)-create new ones
for milestone in task.milestones:
db.create_milestone(
new_lb["id"],
milestone.name,
milestone.code,
description=milestone.description,
exclude_gpus=milestone.exclude_gpus,
)

# and finally trigger re-run
if task.milestones:
await self._submit_milestones(interaction, new_lb["name"])

forum_id = new_lb["forum_id"]
try:
Expand Down
Loading
Loading