diff --git a/.gitignore b/.gitignore index e0b0d03d..d9a5da5f 100644 --- a/.gitignore +++ b/.gitignore @@ -182,4 +182,8 @@ tools/addlicense tools/.addlicense.lock .vscode/* -.claude \ No newline at end of file +.claude + +# macOS metadata files +.DS_Store +._* \ No newline at end of file diff --git a/benchmarks/bench_alloc/README.md b/benchmarks/bench_alloc/README.md new file mode 100644 index 00000000..44765998 --- /dev/null +++ b/benchmarks/bench_alloc/README.md @@ -0,0 +1,93 @@ +# `KVCacheManager` alloc/free microbenchmark + +Times the `alloc(k) + free(handles)` hot path under three allocator implementations: + +- **Python allocator** — baseline before PR #319. +- **C++ allocator** — PR #319 (`lianghao_c++`): allocator migrated to C++, `cudaMemGetInfo` dropped from `available_size()`, page-grouping moved into a single C++ call, `KVCacheBlock` object pool added. +- **C++ + restored resize** — PR #319 + `fix/pr319-restore-resize`: re-adds the elastic-resize poll and shm-name pin that PR #319 dropped. + +NVIDIA GB10 (aarch64). All latency numbers are per call in μs (lower is better) unless noted. + +For e2e vLLM serving numbers, see [`../bench_layout/README.md`](../bench_layout/README.md). + +## Run + +```bash +python bench_alloc.py +``` + +## Results + +### 1. `available_size()` — the most frequently called allocator function + +The Python path called `cudaMemGetInfo` on every invocation (~6 μs each). The C++ path skips it. + +| | μs/call | +|--:|--:| +| Python | 6.52 | +| C++ | 0.52 | +| C++ + resize | 0.52 | + +**12.5×.** Called once per scheduler step. + +### 2. `group_indices_by_page` — called inside `free()` + +Maps a list of N block indices to their owning pages. Python used a per-element loop + `defaultdict`; C++ replaces it with one call. + +| N | Python | C++ | speedup | +|--:|--:|--:|--:| +| 64 | 3.4 | 1.3 | 2.6× | +| 1024 | 52.6 | 16.8 | 3.1× | +| 16384 | 834 | 292 | 2.9× | + +**~3× across the range.** Restored-resize matches C++ within noise. + +### 3. Slow-path alloc — `cuMemMap` per call + +`KVCACHED_MIN/MAX_RESERVED_PAGES=0` forces every alloc to map a fresh 2 MB VMM page. `k` is blocks per alloc; k=128 ≈ one page. + +| k | Python | C++ | C++ + resize | +|--:|--:|--:|--:| +| 128 | 4196 | 4023 | 4354 | +| 1024 | 33028 | 32488 | 34662 | +| 4096 | 134479 | 134430 | 137295 | + +**All within 5%.** The CUDA driver syscall dominates; switching the surrounding code to C++ doesn't help. + +### 4. Multi-thread throughput — Python contention dissolves + +N Python threads, each in a tight `alloc(k=16) + free(h)` loop, `async_sched=True`. Aggregate ops/s (**higher is better**). + +| threads | Python Kops/s | C++ Kops/s | C++ + resize Kops/s | +|--:|--:|--:|--:| +| 1 | 15.1 | 41.2 | 32.5 | +| 4 | 12.0 | 48.6 | 31.6 | +| 8 | 9.1 | 51.5 | 29.1 | + +Python **degrades** under thread count (Python-level contention dominated the old hot path). C++ holds or improves. Restored-resize is flat ~30K — each alloc polls a resize shm descriptor that bare C++ skips. + +(GIL is still held during C++ work, so gains come from shorter critical sections, not real parallelism. Real vLLM uses `async_sched=False` and doesn't exercise this path.) + +### 5. `KVCacheBlock` object pool — C++ only + +Pre-allocated pool of `KVCacheBlock` objects vs `new` per call. The Python baseline has no equivalent pool. + +| N | no-pool | pool | speedup | +|--:|--:|--:|--:| +| 8 | 1.06 | 0.19 | 5.6× | +| 1024 | 147 | 17.4 | 8.5× | +| 4096 | 651 | 67.7 | 9.6× | + +**5-10×**, speedup grows with N. + +## Summary — what the C++ allocator delivers + +- **12.5× on `available_size()`** — eliminates the per-scheduler-step `cudaMemGetInfo` cost. +- **~3× on `group_indices_by_page`** — flat across N from 64 to 16,384. +- **Multi-thread throughput scales** instead of degrading: 8 threads go from 9 Kops/s (Python) to 51 Kops/s (C++). +- **5-10× on `KVCacheBlock` allocation** via the new object pool (no Python equivalent). +- Slow-path `cuMemMap` is driver-bound and unaffected by the migration. + +The restored-resize variant retains every gain except multi-thread (~70% of bare C++) because each alloc polls the resize shm descriptor. + +These wins amortise to ~5% on e2e vLLM serving (per-token model forward dominates). The much larger e2e lever is unrelated — see [`../bench_layout/README.md`](../bench_layout/README.md). diff --git a/benchmarks/bench_alloc/bench_alloc.py b/benchmarks/bench_alloc/bench_alloc.py new file mode 100644 index 00000000..318baff8 --- /dev/null +++ b/benchmarks/bench_alloc/bench_alloc.py @@ -0,0 +1,63 @@ +# SPDX-FileCopyrightText: Copyright contributors to the kvcached project +# SPDX-License-Identifier: Apache-2.0 +"""Microbench for KVCacheManager.alloc() hot path. + +Run on each branch and compare: + python bench_alloc.py +""" +import time + +import torch + +from kvcached.integration.vllm.interfaces import alloc_kv_cache, init_kvcached, shutdown_kvcached +from kvcached.kv_cache_manager import KVCacheManager +from kvcached.vmm_ops import kv_tensors_created + +TP_RANK, TP_SIZE = 0, 1 +NUM_LAYERS = 16 +BLOCK_SIZE = 16 +NUM_BLOCKS = 65536 +DTYPE = torch.float16 +DEVICE = f"cuda:{TP_RANK}" +KV_SHAPE = (2, NUM_BLOCKS, BLOCK_SIZE, 8, 64) + + +def setup(): + torch.cuda.set_device(TP_RANK) + init_kvcached(tp_rank=TP_RANK, world_size=TP_SIZE, is_worker=True, + async_sched=False) + alloc_kv_cache(kvcache_shape=KV_SHAPE, block_size=BLOCK_SIZE, dtype=DTYPE, + device=DEVICE, num_layers=NUM_LAYERS) + t0 = time.time() + while not kv_tensors_created(): + if time.time() - t0 > 10.0: + raise RuntimeError("KV tensors not created within 10s") + time.sleep(0.05) + return KVCacheManager(num_blocks=NUM_BLOCKS, block_size=BLOCK_SIZE, + cell_size=1024, num_layers=NUM_LAYERS, + world_size=TP_SIZE) + + +def bench_alloc_free(manager, k, iters): + # warm up + for _ in range(100): + h = manager.alloc(k) + manager.free(h) + + t0 = time.perf_counter() + for _ in range(iters): + h = manager.alloc(k) + manager.free(h) + elapsed = time.perf_counter() - t0 + per_op_us = elapsed / iters * 1e6 + return per_op_us + + +if __name__ == "__main__": + manager = setup() + print(f"{'k':>6} {'iters':>8} {'us/alloc+free':>16}") + for k, iters in [(1, 50000), (4, 50000), (16, 50000), (64, 20000), + (256, 10000)]: + per_op = bench_alloc_free(manager, k, iters) + print(f"{k:>6} {iters:>8} {per_op:>16.2f}") + shutdown_kvcached() diff --git a/benchmarks/bench_layout/README.md b/benchmarks/bench_layout/README.md new file mode 100644 index 00000000..b23d1fde --- /dev/null +++ b/benchmarks/bench_layout/README.md @@ -0,0 +1,96 @@ +# vLLM e2e + `KVCACHED_CONTIGUOUS_LAYOUT` overhead + +Why kvcached is 30-50% slower than vanilla vLLM by default, and why `KVCACHED_CONTIGUOUS_LAYOUT=false` fixes it. + +For the alloc/free microbench, see [`../bench_alloc/README.md`](../bench_alloc/README.md). + +## Setup + +GB10 (aarch64). `Qwen/Qwen3-0.6B` (28 layers, 8 KV heads, head_dim 128, bf16). `vllm serve --gpu-memory-utilization 0.5 --max-model-len 2048`. Bench: `vllm bench serve` random 512in/128out, 500 prompts, 3 seeds. + +## Run + +```bash +# E2E sweep (vanilla vs kvcached × LAYOUT × reserved pool) +bash run_sweep.sh && python parse_results.py sweep_results/ + +# Kernel-level profile under both layouts +bash run_nsys_layout.sh +python diff_nsys_kernels.py nsys_runs/layout_false.nsys-rep nsys_runs/layout_true.nsys-rep +``` + +Intermediate outputs aren't tracked in git — reproducible from the scripts. + +## 1. The gap + +500 prompts at `rate=inf`: + +| | tput (req/s) | TTFT mean (ms) | TPOT mean (ms) | +|---|--:|--:|--:| +| vanilla | 14.21 | 11575 | 119.3 | +| kvcached (`LAYOUT=true`, default) | 9.87 (-31%) | 16555 | 177.5 | +| kvcached + `LAYOUT=false` | 14.17 (-1%) | 11642 | 119.0 | + +`LAYOUT=false` matches vanilla on every metric, also at `rate=16` (sustained load). The C++ allocator from PR #319 only buys back ~5%; reserve-pool size doesn't help either. **It's all the layout.** + +## 2. Where the gap actually is + +### Stride math + +`CONTIGUOUS_LAYOUT=true` lays out KV as `[num_blocks, num_layers, k/v, token, head, dim]` (`interfaces.py:282-289`). When you slice down to one layer, block n→n+1 stride is `num_layers × per_block_bytes`. For Qwen3-0.6B: + +- per-block K+V, one layer = 16·8·128·2 = **64 KB** +- stride under `LAYOUT=true` = 28 × 64 KB = **1.75 MB** (≈ VMM page = 2 MB) +- stride under `LAYOUT=false` = **64 KB** (~32 blocks share a page) + +So under contiguous, every FlashAttention block read lands on its own fresh 2 MB page. Non-contiguous packs 32 blocks per page. The attention kernel can't hide that. + +### nsys per-kernel breakdown + +Same workload as Section 1. Going from `LAYOUT=false → true` adds **+8,043 ms (+34.8%)** total GPU kernel time, all in one kernel: + +| kernel | calls | LAYOUT=false ms | LAYOUT=true ms | Δ | +|---|--:|--:|--:|--:| +| `flash::flash_fwd_splitkv_kernel` (KV-read) | 3948 | 14,666 | 22,879 | **+8,213 (+56%)** | +| `vllm::reshape_and_cache_flash_kernel` (KV-write) | 3948 | 302 | 271 | -32 (-11%) | +| everything else | — | ~8,000 | ~8,000 | ~0 | + +That one kernel exceeds the entire gap. Worth noting: the KV-*write* kernel isn't affected — only the multi-block read path is. Writes are sequential per-position so they never hit the cross-page stride. + +### Scales with working set + +Per-call attention time: + +- 100 prompts: 1163 vs 851 μs (**+37%**) +- 500 prompts: 5795 vs 3715 μs (**+56%**) + +More concurrent requests → larger working set → more distinct 2 MB pages touched → worse TLB/L2 hit rate. `LAYOUT=false` stays flat because 32 blocks share one page. Deeper models (Llama2-7B at 32 layers, Llama3-70B at 80) cross the page boundary even harder. + +## 3. Where `LAYOUT=true` still wins + +Three things to put on the other side of the scale. + +**Hybrid linear / mamba: required.** Mamba state shares the KV buffer and indexes by virtual block across layers. `interfaces.py:138` outright refuses non-contiguous for hybrid-linear configs. + +**Init time: ~1.4 s faster at server boot.** Contiguous reserves one big VM range; non-contiguous reserves `num_layers` separate ones. Measured `alloc_kv_cache` (16 layers, 1 GB/layer): 635 ms vs 2055 ms. ~99% of that 1.4 s is `FTensor::init_with_zero_()` mapping the zero-page over the entire VM range — contiguous uses a 64 MB compound page so it makes 1947 `cuMemMap` calls (~325 μs each); non-contiguous uses 2 MB pages and makes 62,304 calls (~33 μs each). CUDA driver per-call overhead is the dominant cost, and bigger pages amortise it better. + +The gap stays roughly flat across `num_layers ∈ {8..80}` (1.3–1.5 s), one-shot at startup. + +**Alloc/free hot path: ~2× faster.** Each page mapping under contiguous = 1 `cuMemMap`; under non-contiguous = `num_layers × (K+V)` FTensor `map()` calls. Cold path (`RESERVED=0`) shows a consistent 2.1× ratio; steady-state at small `k` is similar, collapsing to ~1× at `k=256`. + +### When does the trade-off flip? + +Attention overhead hits every decode step. Startup hits once. For the Section 1 workload: + +- `LAYOUT=true` startup advantage: ~1.4 s +- `LAYOUT=false` throughput advantage: 14.17 vs 9.83 req/s, ≈ 31 ms/req + +Break-even at **~45 requests**. Above that, non-contiguous wins on total wall-clock; below, contiguous's faster boot wins. Deeper models shift the break-even down further. + +So contiguous still wins for: smoke tests, single-shot inference, request-level autoscaling, boot-SLA workloads, hybrid linear/mamba (forced). Everything else: non-contiguous. + +## Summary + +The kvcached default `CONTIGUOUS_LAYOUT=true` costs ~30% e2e throughput on standard MHA/GQA/MLA because every FlashAttention block read crosses a fresh 2 MB VMM page. Flipping to `LAYOUT=false` closes the gap entirely, at the price of ~1.4 s extra startup that's paid off in tens of requests. + +The default should flip to `false` for non-hybrid models; `interfaces.py:138` already handles the hybrid-linear case where contiguous is mandatory. diff --git a/benchmarks/bench_layout/diff_nsys_kernels.py b/benchmarks/bench_layout/diff_nsys_kernels.py new file mode 100644 index 00000000..c80038a0 --- /dev/null +++ b/benchmarks/bench_layout/diff_nsys_kernels.py @@ -0,0 +1,101 @@ +# SPDX-FileCopyrightText: Copyright contributors to the kvcached project +# SPDX-License-Identifier: Apache-2.0 +"""Diff per-kernel GPU time between two nsys traces. + +Usage: + python diff_nsys_kernels.py + +Prints kernels ordered by absolute delta (variant - baseline). Useful for +identifying which kernels regress between KVCACHED_CONTIGUOUS_LAYOUT=true and +false. +""" +import csv +import subprocess +import sys +from collections import defaultdict +from io import StringIO + + +def kernel_sums(nsys_rep_path: str) -> dict: + """Returns {kernel_name: (total_ns, instances)} for the given trace.""" + result = subprocess.run( + [ + "nsys", + "stats", + "--report", + "cuda_gpu_kern_sum", + "--format", + "csv", + "--force-overwrite=true", + nsys_rep_path, + ], + capture_output=True, + text=True, + check=True, + ) + # nsys stats emits header text, then a CSV table. Find the CSV. + text = result.stdout + # Find a line that looks like a header row. + lines = text.splitlines() + start = None + for i, line in enumerate(lines): + if line.startswith("Time (%)") or line.startswith("\"Time (%)"): + start = i + break + if start is None: + raise RuntimeError( + f"No CSV header found in nsys stats output for {nsys_rep_path}.\n" + f"First 500 chars:\n{text[:500]}") + + csv_text = "\n".join(lines[start:]) + reader = csv.DictReader(StringIO(csv_text)) + sums: dict = defaultdict(lambda: [0, 0]) + for row in reader: + name = row["Name"] + total_ns = int(row["Total Time (ns)"]) + instances = int(row["Instances"]) + sums[name][0] += total_ns + sums[name][1] += instances + return {k: tuple(v) for k, v in sums.items()} + + +def main(): + if len(sys.argv) != 3: + print(__doc__) + sys.exit(1) + baseline_path, variant_path = sys.argv[1], sys.argv[2] + print(f"baseline = {baseline_path}", file=sys.stderr) + print(f"variant = {variant_path}", file=sys.stderr) + + base = kernel_sums(baseline_path) + var = kernel_sums(variant_path) + + all_kernels = set(base) | set(var) + rows = [] + for k in all_kernels: + b_ns, b_n = base.get(k, (0, 0)) + v_ns, v_n = var.get(k, (0, 0)) + delta = v_ns - b_ns + rows.append((k, b_ns, b_n, v_ns, v_n, delta)) + + rows.sort(key=lambda r: r[5]) # by delta ascending (negative first = sped up) + + total_b = sum(r[1] for r in rows) + total_v = sum(r[3] for r in rows) + print( + f"\nTotal kernel time: baseline={total_b/1e6:,.1f} ms " + f"variant={total_v/1e6:,.1f} ms " + f"delta={ (total_v-total_b)/1e6:+,.1f} ms ({(total_v-total_b)/total_b*100:+.1f}%)\n" + ) + + print(f"{'kernel':<80} {'base ms':>10} {'var ms':>10} {'delta ms':>10} {'delta %':>8} base_n var_n") + for k, b_ns, b_n, v_ns, v_n, d_ns in rows: + if abs(d_ns) < 1_000_000: # < 1 ms delta, skip noise + continue + pct = (d_ns / b_ns * 100) if b_ns else float("inf") + print( + f"{k[:80]:<80} {b_ns/1e6:>10.2f} {v_ns/1e6:>10.2f} {d_ns/1e6:>+10.2f} {pct:>+7.1f}% {b_n:>6} {v_n:>6}") + + +if __name__ == "__main__": + main() diff --git a/benchmarks/bench_layout/parse_results.py b/benchmarks/bench_layout/parse_results.py new file mode 100644 index 00000000..4af7631c --- /dev/null +++ b/benchmarks/bench_layout/parse_results.py @@ -0,0 +1,92 @@ +#!/usr/bin/env python3 +# SPDX-FileCopyrightText: Copyright contributors to the kvcached project +# SPDX-License-Identifier: Apache-2.0 + +"""Aggregate sweep results into a markdown table (median across 3 seeds).""" +import glob +import json +import os +import statistics +import sys + + +def median(xs): + return statistics.median(xs) + + +def load_one(path): + with open(path) as f: + return json.load(f) + + +def collect(label, dir_): + files = sorted(glob.glob(os.path.join(dir_, f"{label}.seed*.json"))) + runs = [] + for fp in files: + d = load_one(fp) + if d.get("completed", 0) == 0 and d.get("num_prompts", 0) > 0: + # benchmark crashed or all reqs failed + runs.append({"failed": True, "file": os.path.basename(fp)}) + continue + runs.append({ + "throughput": d.get("request_throughput"), + "mean_ttft": d.get("mean_ttft_ms"), + "p99_ttft": d.get("p99_ttft_ms"), + "mean_tpot": d.get("mean_tpot_ms"), + "p99_tpot": d.get("p99_tpot_ms"), + "completed": d.get("completed"), + }) + return runs + + +def median_of(runs, key): + valid = [r[key] for r in runs if not r.get("failed") and r.get(key) is not None] + if not valid: + return None + return median(valid) + + +def main(dir_): + configs = [ + ("A_vanilla_inf", "vanilla vLLM (no kvcached), rate=inf"), + ("B_kvcached_default_inf", "kvcached default, rate=inf"), + ("C_layout_false_inf", "kvcached + LAYOUT=false, rate=inf"), + ("D_reserved200_inf", "kvcached + RESERVED=200, rate=inf"), + ("E_both_inf", "kvcached + LAYOUT=false + RESERVED=200, rate=inf"), + ("F_vanilla_r16", "vanilla vLLM, rate=16"), + ("G_kvcached_default_r16", "kvcached default, rate=16"), + ("H_best_r16", "kvcached + LAYOUT=false + RESERVED=200, rate=16"), + ] + + print(f"{'config':<55} {'tput':>8} {'mTTFT':>10} {'p99TTFT':>10} {'mTPOT':>8} {'p99TPOT':>9} seeds") + rows = [] + for label, desc in configs: + runs = collect(label, dir_) + if not runs: + continue + ok = sum(1 for r in runs if not r.get("failed") and r.get("completed", 0) > 0) + failed = sum(1 for r in runs if r.get("failed") or r.get("completed", 0) == 0) + tput = median_of(runs, "throughput") + mttft = median_of(runs, "mean_ttft") + pttft = median_of(runs, "p99_ttft") + mtpot = median_of(runs, "mean_tpot") + ptpot = median_of(runs, "p99_tpot") + def s(v, d=2): + return f"{v:.{d}f}" if v is not None else "—" + print(f"{desc:<55} {s(tput,2):>8} {s(mttft,1):>10} {s(pttft,1):>10} {s(mtpot,2):>8} {s(ptpot,2):>9} ok={ok} fail={failed}") + rows.append((label, desc, ok, failed, tput, mttft, pttft, mtpot, ptpot)) + + # Markdown table + print() + print("| config | tput (req/s) | mean TTFT (ms) | P99 TTFT (ms) | mean TPOT (ms) | P99 TPOT (ms) |") + print("|---|--:|--:|--:|--:|--:|") + for label, desc, ok, failed, tput, mttft, pttft, mtpot, ptpot in rows: + suffix = f" *(failed in {failed}/{ok+failed})*" if failed else "" + def s(v, d=2): + return f"{v:.{d}f}" if v is not None else "—" + print(f"| {desc}{suffix} | {s(tput)} | {s(mttft,1)} | {s(pttft,1)} | {s(mtpot)} | {s(ptpot)} |") + + +if __name__ == "__main__": + dir_ = sys.argv[1] if len(sys.argv) > 1 else "." + main(dir_) diff --git a/benchmarks/bench_layout/run_kvcached_configs.sh b/benchmarks/bench_layout/run_kvcached_configs.sh new file mode 100755 index 00000000..6153fe59 --- /dev/null +++ b/benchmarks/bench_layout/run_kvcached_configs.sh @@ -0,0 +1,139 @@ +#!/bin/bash +# Run remaining kvcached configs (B–H minus vanilla which is done). + +set -uo pipefail + +SCRIPT_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +RESULTS_DIR="$SCRIPT_DIR/sweep_results" +LOG_DIR="$SCRIPT_DIR/sweep_logs" +mkdir -p "$RESULTS_DIR" "$LOG_DIR" + +VLLM="/home/xingqi/miniforge3/envs/kvcached/bin/vllm" +MODEL="Qwen/Qwen3-0.6B" +PORT=12347 +SEEDS=(42 99 7) + +cleanup_server() { + pkill -INT -f "vllm serve Qwen" 2>/dev/null || true + pkill -INT -f "VLLM::EngineCore" 2>/dev/null || true + for i in $(seq 1 30); do + if ! ss -lnt 2>/dev/null | awk '$4 ~ /:'"$PORT"'$/' | grep -q .; then + return 0 + fi + sleep 1 + done + pkill -KILL -f "vllm serve" 2>/dev/null || true + pkill -KILL -f "VLLM::EngineCore" 2>/dev/null || true + sleep 2 +} + +wait_for_ready() { + local log=$1 + for i in $(seq 1 240); do + if curl -fsS "http://localhost:$PORT/v1/models" >/dev/null 2>&1; then + return 0 + fi + if grep -qE "EngineCore failed|Traceback|RuntimeError|FATAL" "$log" 2>/dev/null; then + return 1 + fi + sleep 2 + done + return 1 +} + +run_config() { + local label=$1 + local request_rate=$2 + local env_str=$3 + + echo + echo "==========================================" + echo "[$(date +%H:%M:%S)] Config: $label rate=$request_rate" + echo "Env: $env_str" + echo "==========================================" + + cleanup_server + + local server_log="$LOG_DIR/${label}.server.log" + : > "$server_log" + + # shellcheck disable=SC2086 + env $env_str \ + "$VLLM" serve "$MODEL" \ + --port "$PORT" \ + --gpu-memory-utilization 0.5 \ + --max-model-len 2048 \ + > "$server_log" 2>&1 & + local server_pid=$! + + if ! wait_for_ready "$server_log"; then + echo " FAILED: server didn't come up; tail of log:" + tail -30 "$server_log" + kill -9 $server_pid 2>/dev/null || true + cleanup_server + return 1 + fi + + echo " [$(date +%H:%M:%S)] Server up. Warmup (100 prompts)..." + "$VLLM" bench serve \ + --backend vllm \ + --model "$MODEL" \ + --port "$PORT" \ + --dataset-name random \ + --random-input-len 512 --random-output-len 128 \ + --num-prompts 100 \ + --request-rate "$request_rate" \ + --seed 1 \ + > "$LOG_DIR/${label}.warmup.log" 2>&1 || true + + for seed in "${SEEDS[@]}"; do + echo " [$(date +%H:%M:%S)] seed=$seed..." + "$VLLM" bench serve \ + --backend vllm \ + --model "$MODEL" \ + --port "$PORT" \ + --dataset-name random \ + --random-input-len 512 --random-output-len 128 \ + --num-prompts 500 \ + --request-rate "$request_rate" \ + --seed "$seed" \ + --save-result \ + --result-dir "$RESULTS_DIR" \ + --result-filename "${label}.seed${seed}.json" \ + > "$LOG_DIR/${label}.seed${seed}.log" 2>&1 + # extract key metrics + grep -E "Request throughput|Mean TTFT|Mean TPOT|P99 TTFT|P99 TPOT" "$LOG_DIR/${label}.seed${seed}.log" \ + | sed 's/^/ /' + done + + echo " [$(date +%H:%M:%S)] Tearing down..." + kill -INT $server_pid 2>/dev/null || true + sleep 3 + cleanup_server +} + +# Config B: kvcached default (rate=inf) +run_config "B_kvcached_default_inf" inf \ + "ENABLE_KVCACHED=true KVCACHED_AUTOPATCH=1" + +# Config C: CONTIGUOUS_LAYOUT=false (rate=inf) +run_config "C_layout_false_inf" inf \ + "ENABLE_KVCACHED=true KVCACHED_AUTOPATCH=1 KVCACHED_CONTIGUOUS_LAYOUT=false" + +# Config D: MAX_RESERVED_PAGES=200 (rate=inf) +run_config "D_reserved200_inf" inf \ + "ENABLE_KVCACHED=true KVCACHED_AUTOPATCH=1 KVCACHED_MIN_RESERVED_PAGES=50 KVCACHED_MAX_RESERVED_PAGES=200" + +# Config G: kvcached default (rate=16) +run_config "G_kvcached_default_r16" 16 \ + "ENABLE_KVCACHED=true KVCACHED_AUTOPATCH=1" + +# Config H: kvcached best (rate=16) -- combine layout+reserved if needed +run_config "H_best_r16" 16 \ + "ENABLE_KVCACHED=true KVCACHED_AUTOPATCH=1 KVCACHED_CONTIGUOUS_LAYOUT=false KVCACHED_MIN_RESERVED_PAGES=50 KVCACHED_MAX_RESERVED_PAGES=200" + +echo +echo "==========================================" +echo "[$(date +%H:%M:%S)] All configs done." +echo "Results in $RESULTS_DIR" +echo "==========================================" diff --git a/benchmarks/bench_layout/run_layout_retest.sh b/benchmarks/bench_layout/run_layout_retest.sh new file mode 100755 index 00000000..27e208ee --- /dev/null +++ b/benchmarks/bench_layout/run_layout_retest.sh @@ -0,0 +1,116 @@ +#!/bin/bash +# Re-run C (CONTIGUOUS_LAYOUT=false) and H (best r16) after the +# kv_cache_manager.py:107 fix. + +set -uo pipefail + +SCRIPT_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +RESULTS_DIR="$SCRIPT_DIR/sweep_results" +LOG_DIR="$SCRIPT_DIR/sweep_logs" +mkdir -p "$RESULTS_DIR" "$LOG_DIR" + +VLLM="/home/xingqi/miniforge3/envs/kvcached/bin/vllm" +MODEL="Qwen/Qwen3-0.6B" +PORT=12347 +SEEDS=(42 99 7) + +cleanup_server() { + pkill -INT -f "vllm serve Qwen" 2>/dev/null || true + pkill -INT -f "VLLM::EngineCore" 2>/dev/null || true + for i in $(seq 1 30); do + if ! ss -lnt 2>/dev/null | awk '$4 ~ /:'"$PORT"'$/' | grep -q .; then + return 0 + fi + sleep 1 + done + pkill -KILL -f "vllm serve" 2>/dev/null || true + pkill -KILL -f "VLLM::EngineCore" 2>/dev/null || true + sleep 2 + return 0 +} + +wait_for_ready() { + local log=$1 + for i in $(seq 1 240); do + if curl -fsS "http://localhost:$PORT/v1/models" >/dev/null 2>&1; then + return 0 + fi + if grep -qE "EngineCore failed|Traceback|RuntimeError|FATAL|Engine core initialization failed|invalid argument" "$log" 2>/dev/null; then + return 1 + fi + sleep 2 + done + return 1 +} + +run_config() { + local label=$1 + local request_rate=$2 + local env_str=$3 + + echo + echo "==========================================" + echo "[$(date +%H:%M:%S)] Config: $label rate=$request_rate" + echo "Env: $env_str" + echo "==========================================" + + cleanup_server + + local server_log="$LOG_DIR/${label}.server.log" + : > "$server_log" + + # shellcheck disable=SC2086 + env $env_str \ + "$VLLM" serve "$MODEL" \ + --port "$PORT" \ + --gpu-memory-utilization 0.5 \ + --max-model-len 2048 \ + > "$server_log" 2>&1 & + local server_pid=$! + + if ! wait_for_ready "$server_log"; then + echo " FAILED: server didn't come up; tail:" + tail -25 "$server_log" + kill -9 $server_pid 2>/dev/null || true + cleanup_server + return 1 + fi + + echo " [$(date +%H:%M:%S)] Server up. Warmup..." + "$VLLM" bench serve \ + --backend vllm --model "$MODEL" --port "$PORT" \ + --dataset-name random --random-input-len 512 --random-output-len 128 \ + --num-prompts 100 --request-rate "$request_rate" --seed 1 \ + > "$LOG_DIR/${label}.warmup.log" 2>&1 || true + + for seed in "${SEEDS[@]}"; do + echo " [$(date +%H:%M:%S)] seed=$seed" + "$VLLM" bench serve \ + --backend vllm --model "$MODEL" --port "$PORT" \ + --dataset-name random --random-input-len 512 --random-output-len 128 \ + --num-prompts 500 --request-rate "$request_rate" --seed "$seed" \ + --save-result --result-dir "$RESULTS_DIR" \ + --result-filename "${label}.seed${seed}.json" \ + > "$LOG_DIR/${label}.seed${seed}.log" 2>&1 + grep -E "Request throughput|Mean TTFT|Mean TPOT|P99 TTFT|P99 TPOT|Successful requests" \ + "$LOG_DIR/${label}.seed${seed}.log" | sed 's/^/ /' + done + + echo " [$(date +%H:%M:%S)] Tearing down..." + kill -INT $server_pid 2>/dev/null || true + sleep 3 + cleanup_server +} + +# Re-run C with the fix +run_config "C_layout_false_inf" inf \ + "ENABLE_KVCACHED=true KVCACHED_AUTOPATCH=1 KVCACHED_CONTIGUOUS_LAYOUT=false" + +# Re-run H with the fix +run_config "H_best_r16" 16 \ + "ENABLE_KVCACHED=true KVCACHED_AUTOPATCH=1 KVCACHED_CONTIGUOUS_LAYOUT=false KVCACHED_MIN_RESERVED_PAGES=50 KVCACHED_MAX_RESERVED_PAGES=200" + +echo +echo "==========================================" +echo "[$(date +%H:%M:%S)] Re-run done." +echo "==========================================" diff --git a/benchmarks/bench_layout/run_ncu_attn.sh b/benchmarks/bench_layout/run_ncu_attn.sh new file mode 100755 index 00000000..e2bda0e6 --- /dev/null +++ b/benchmarks/bench_layout/run_ncu_attn.sh @@ -0,0 +1,128 @@ +#!/bin/bash +# SPDX-FileCopyrightText: Copyright contributors to the kvcached project +# SPDX-License-Identifier: Apache-2.0 +# +# After nsys identifies the regressing kernel(s), this captures detailed +# memory metrics via ncu. Set KERNEL_REGEX to a regex matching the kernel name +# (e.g. "flash_attn|fa_fwd|fa3"). + +set -uo pipefail + +SCRIPT_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +OUT_DIR="$SCRIPT_DIR/ncu_runs" +LOG_DIR="$SCRIPT_DIR/ncu_logs" +mkdir -p "$OUT_DIR" "$LOG_DIR" + +VLLM="/home/xingqi/miniforge3/envs/kvcached/bin/vllm" +MODEL="Qwen/Qwen3-0.6B" +PORT=12349 +KERNEL_REGEX="${KERNEL_REGEX:-flash_fwd_splitkv_kernel}" +LAUNCH_SKIP="${LAUNCH_SKIP:-400}" +LAUNCH_COUNT="${LAUNCH_COUNT:-8}" + +cleanup_server() { + pkill -INT -f "vllm serve Qwen" 2>/dev/null || true + pkill -INT -f "VLLM::EngineCore" 2>/dev/null || true + for i in $(seq 1 30); do + if ! ss -lnt 2>/dev/null | awk '$4 ~ /:'"$PORT"'$/' | grep -q .; then + return 0 + fi + sleep 1 + done + pkill -KILL -f "vllm serve" 2>/dev/null || true + pkill -KILL -f "VLLM::EngineCore" 2>/dev/null || true + sleep 2 +} + +wait_for_ready() { + local log=$1 + for i in $(seq 1 240); do + if curl -fsS "http://localhost:$PORT/v1/models" >/dev/null 2>&1; then + return 0 + fi + if grep -qE "EngineCore failed|Traceback|RuntimeError|FATAL" "$log" 2>/dev/null; then + return 1 + fi + sleep 2 + done + return 1 +} + +run_one() { + local label=$1 + local env_str=$2 + + echo + echo "==========================================" + echo "[$(date +%H:%M:%S)] ncu: $label" + echo "Env: $env_str" + echo "Kernel regex: $KERNEL_REGEX" + echo "Skip: $LAUNCH_SKIP, Count: $LAUNCH_COUNT" + echo "==========================================" + + cleanup_server + + local server_log="$LOG_DIR/${label}.server.log" + : > "$server_log" + + # shellcheck disable=SC2086 + env $env_str \ + ncu \ + --target-processes all \ + --export "$OUT_DIR/${label}" \ + --force-overwrite \ + --kernel-name "regex:$KERNEL_REGEX" \ + --launch-skip "$LAUNCH_SKIP" \ + --launch-count "$LAUNCH_COUNT" \ + --section MemoryWorkloadAnalysis \ + --section MemoryWorkloadAnalysis_Tables \ + --section LaunchStats \ + --section Occupancy \ + --section SourceCounters \ + --section SpeedOfLight \ + --section SpeedOfLight_RooflineChart \ + --section WarpStateStats \ + "$VLLM" serve "$MODEL" \ + --port "$PORT" \ + --gpu-memory-utilization 0.5 \ + --max-model-len 2048 \ + > "$server_log" 2>&1 & + local server_pid=$! + + if ! wait_for_ready "$server_log"; then + echo " FAILED: server didn't come up; tail of log:" + tail -30 "$server_log" + kill -9 $server_pid 2>/dev/null || true + cleanup_server + return 1 + fi + echo " [$(date +%H:%M:%S)] Server ready" + + # Just push 50 prompts; ncu replays each kernel many times so this will + # take ~5-15 minutes. We don't need throughput here, only kernel metrics. + "$VLLM" bench serve --backend vllm --model "$MODEL" --port "$PORT" \ + --dataset-name random --random-input-len 512 --random-output-len 128 \ + --num-prompts 50 --request-rate inf --seed 42 \ + > "$LOG_DIR/${label}.bench.log" 2>&1 & + local bench_pid=$! + echo " [$(date +%H:%M:%S)] Bench started (pid $bench_pid)" + # Wait until launch-count kernels collected then kill bench + wait "$bench_pid" 2>/dev/null || true + + kill -INT $server_pid 2>/dev/null || true + sleep 5 + cleanup_server + sleep 3 +} + +run_one "layout_true" \ + "ENABLE_KVCACHED=true KVCACHED_AUTOPATCH=1" + +run_one "layout_false" \ + "ENABLE_KVCACHED=true KVCACHED_AUTOPATCH=1 KVCACHED_CONTIGUOUS_LAYOUT=false" + +echo +echo "==========================================" +echo "Reports in $OUT_DIR" +ls -la "$OUT_DIR" | tail -10 +echo "==========================================" diff --git a/benchmarks/bench_layout/run_nsys_layout.sh b/benchmarks/bench_layout/run_nsys_layout.sh new file mode 100755 index 00000000..9c36bda9 --- /dev/null +++ b/benchmarks/bench_layout/run_nsys_layout.sh @@ -0,0 +1,159 @@ +#!/bin/bash +# SPDX-FileCopyrightText: Copyright contributors to the kvcached project +# SPDX-License-Identifier: Apache-2.0 +# +# Launch vllm-serve under nsys with KVCACHED_CONTIGUOUS_LAYOUT in {true,false} +# and drive it with a small bench burst. Captures GPU kernel timeline so we can +# diff per-kernel time between the two layouts. + +set -uo pipefail + +SCRIPT_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +OUT_DIR="$SCRIPT_DIR/nsys_runs" +LOG_DIR="$SCRIPT_DIR/nsys_logs" +mkdir -p "$OUT_DIR" "$LOG_DIR" + +VLLM="/home/xingqi/miniforge3/envs/kvcached/bin/vllm" +MODEL="Qwen/Qwen3-0.6B" +PORT=12348 +NUM_PROMPTS=${NUM_PROMPTS:-100} +RATE=${RATE:-inf} +CAPTURE_DURATION=${CAPTURE_DURATION:-90} # seconds +WARMUP_PROMPTS=${WARMUP_PROMPTS:-30} + +cleanup_server() { + pkill -INT -f "vllm serve Qwen" 2>/dev/null || true + pkill -INT -f "VLLM::EngineCore" 2>/dev/null || true + for i in $(seq 1 30); do + if ! ss -lnt 2>/dev/null | awk '$4 ~ /:'"$PORT"'$/' | grep -q .; then + return 0 + fi + sleep 1 + done + pkill -KILL -f "vllm serve" 2>/dev/null || true + pkill -KILL -f "VLLM::EngineCore" 2>/dev/null || true + sleep 2 +} + +wait_for_ready() { + local log=$1 + for i in $(seq 1 240); do + if curl -fsS "http://localhost:$PORT/v1/models" >/dev/null 2>&1; then + return 0 + fi + if grep -qE "EngineCore failed|Traceback|RuntimeError|FATAL" "$log" 2>/dev/null; then + return 1 + fi + sleep 2 + done + return 1 +} + +run_one() { + local label=$1 + local env_str=$2 + + echo + echo "==========================================" + echo "[$(date +%H:%M:%S)] nsys profile: $label" + echo "Env: $env_str" + echo "==========================================" + + cleanup_server + + local server_log="$LOG_DIR/${label}.server.log" + : > "$server_log" + + # Launch vllm serve under nsys. We don't constrain duration here -- we'll + # rely on cudaProfilerApi-style capture range below for the steady-state + # decode window. Until we add explicit hooks, just let nsys profile the + # full lifetime and use --capture-range=none + --duration on the bench + # window. Simpler: profile the full run, mark the bench start/end with + # NVTX ranges so we can filter post-hoc. + # + # Using --sample=none keeps overhead modest and CPU samples out. + # --trace=cuda,nvtx,osrt captures kernel launches + NVTX + a bit of OS. + + # shellcheck disable=SC2086 + env $env_str \ + nsys profile \ + --output="$OUT_DIR/${label}" \ + --force-overwrite=true \ + --trace=cuda,nvtx \ + --sample=none \ + --cpuctxsw=none \ + --capture-range=cudaProfilerApi \ + --capture-range-end=stop \ + --gpu-metrics-device=none \ + "$VLLM" serve "$MODEL" \ + --port "$PORT" \ + --gpu-memory-utilization 0.5 \ + --max-model-len 2048 \ + --profiler-config '{"profiler":"cuda","ignore_frontend":true}' \ + > "$server_log" 2>&1 & + local server_pid=$! + + if ! wait_for_ready "$server_log"; then + echo " FAILED: server didn't come up; tail of log:" + tail -30 "$server_log" + kill -9 $server_pid 2>/dev/null || true + cleanup_server + return 1 + fi + echo " [$(date +%H:%M:%S)] Server ready" + + # Warmup (untimed, fills caches / triggers compile) + if [[ "$WARMUP_PROMPTS" -gt 0 ]]; then + echo " [$(date +%H:%M:%S)] Warmup ${WARMUP_PROMPTS} prompts..." + "$VLLM" bench serve --backend vllm --model "$MODEL" --port "$PORT" \ + --dataset-name random --random-input-len 512 --random-output-len 128 \ + --num-prompts "$WARMUP_PROMPTS" --request-rate "$RATE" --seed 1 \ + > "$LOG_DIR/${label}.warmup.log" 2>&1 || true + fi + + # Start the profiler window via the cudaProfilerStart side-channel. + # Easiest path: hit vllm's /start_profile endpoint, run the bench, then + # hit /stop_profile. vllm's start_profile uses torch.profiler, which calls + # cudaProfilerStart underneath. This matches nsys --capture-range=cudaProfilerApi. + # + # If /start_profile is not available (vllm too old), we fall back to a + # full-trace capture without --capture-range. + + if curl -fsS -X POST "http://localhost:$PORT/start_profile" >/dev/null 2>&1; then + echo " [$(date +%H:%M:%S)] /start_profile hit. Running bench..." + + "$VLLM" bench serve --backend vllm --model "$MODEL" --port "$PORT" \ + --dataset-name random --random-input-len 512 --random-output-len 128 \ + --num-prompts "$NUM_PROMPTS" --request-rate "$RATE" --seed 42 \ + > "$LOG_DIR/${label}.bench.log" 2>&1 + local bench_rc=$? + + curl -fsS -X POST "http://localhost:$PORT/stop_profile" >/dev/null 2>&1 || true + echo " [$(date +%H:%M:%S)] /stop_profile hit. bench rc=$bench_rc" + else + echo " /start_profile not available; profiler window not bounded. Tearing down." + fi + + # Extract bench summary + grep -E "Request throughput|Mean TTFT|Mean TPOT|Total token throughput|Successful requests" \ + "$LOG_DIR/${label}.bench.log" | sed 's/^/ /' + + kill -INT $server_pid 2>/dev/null || true + sleep 5 + cleanup_server + sleep 3 +} + +# Layout = true (default) +run_one "layout_true" \ + "ENABLE_KVCACHED=true KVCACHED_AUTOPATCH=1" + +# Layout = false +run_one "layout_false" \ + "ENABLE_KVCACHED=true KVCACHED_AUTOPATCH=1 KVCACHED_CONTIGUOUS_LAYOUT=false" + +echo +echo "==========================================" +echo "Traces in $OUT_DIR" +ls -la "$OUT_DIR" | tail -10 +echo "==========================================" diff --git a/benchmarks/bench_layout/run_sweep.sh b/benchmarks/bench_layout/run_sweep.sh new file mode 100755 index 00000000..6e767921 --- /dev/null +++ b/benchmarks/bench_layout/run_sweep.sh @@ -0,0 +1,162 @@ +#!/bin/bash +# Sweep e2e configs for kvcached overhead investigation. +# Each config: start server with specified env, run warmup + 3 seeds, parse, kill. + +set -uo pipefail + +SCRIPT_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +RESULTS_DIR="$SCRIPT_DIR/sweep_results" +LOG_DIR="$SCRIPT_DIR/sweep_logs" +mkdir -p "$RESULTS_DIR" "$LOG_DIR" + +VENV_PY="/home/xingqi/miniforge3/envs/kvcached/bin/python" +VLLM="/home/xingqi/miniforge3/envs/kvcached/bin/vllm" + +MODEL="Qwen/Qwen3-0.6B" +PORT=12347 +GPU_MEM_UTIL=0.5 +MAX_MODEL_LEN=2048 + +WARMUP_PROMPTS=100 +NUM_PROMPTS=500 +INPUT_LEN=512 +OUTPUT_LEN=128 +SEEDS=(42 99 7) + +cleanup_server() { + pkill -9 -f "vllm.entrypoints.openai.api_server" 2>/dev/null || true + pkill -9 -f "vllm serve" 2>/dev/null || true + # Wait for port to be free + for i in $(seq 1 30); do + if ! ss -ltn 2>/dev/null | grep -q ":$PORT "; then + return 0 + fi + sleep 1 + done + return 1 +} + +wait_for_server() { + local log=$1 + for i in $(seq 1 240); do + if curl -fsS "http://localhost:$PORT/v1/models" >/dev/null 2>&1; then + return 0 + fi + if grep -q "Error\|Traceback" "$log" 2>/dev/null; then + echo "Server crashed; see $log" + return 1 + fi + sleep 2 + done + return 1 +} + +# Args: label, request_rate, env_string ("KEY1=VAL1 KEY2=VAL2"), extra_server_args +run_config() { + local label=$1 + local request_rate=$2 + local env_str=$3 + local extra=$4 + + echo + echo "==========================================" + echo "Config: $label rate=$request_rate" + echo "Env: $env_str" + echo "==========================================" + + cleanup_server + + local server_log="$LOG_DIR/${label}.server.log" + : > "$server_log" + + # shellcheck disable=SC2086 + env $env_str \ + "$VLLM" serve "$MODEL" \ + --port "$PORT" \ + --gpu-memory-utilization "$GPU_MEM_UTIL" \ + --max-model-len "$MAX_MODEL_LEN" \ + $extra \ + > "$server_log" 2>&1 & + local server_pid=$! + + if ! wait_for_server "$server_log"; then + echo " FAILED: server didn't come up" + kill -9 $server_pid 2>/dev/null || true + cleanup_server + return 1 + fi + + echo " Server up (pid=$server_pid). Running warmup..." + "$VLLM" bench serve \ + --backend vllm \ + --model "$MODEL" \ + --port "$PORT" \ + --dataset-name random \ + --random-input-len $INPUT_LEN \ + --random-output-len $OUTPUT_LEN \ + --num-prompts $WARMUP_PROMPTS \ + --request-rate "$request_rate" \ + --seed 1 \ + > "$LOG_DIR/${label}.warmup.log" 2>&1 || true + + for seed in "${SEEDS[@]}"; do + echo " Run seed=$seed..." + local out="$RESULTS_DIR/${label}.seed${seed}.json" + "$VLLM" bench serve \ + --backend vllm \ + --model "$MODEL" \ + --port "$PORT" \ + --dataset-name random \ + --random-input-len $INPUT_LEN \ + --random-output-len $OUTPUT_LEN \ + --num-prompts $NUM_PROMPTS \ + --request-rate "$request_rate" \ + --seed "$seed" \ + --save-result \ + --result-dir "$RESULTS_DIR" \ + --result-filename "${label}.seed${seed}.json" \ + > "$LOG_DIR/${label}.seed${seed}.log" 2>&1 + done + + echo " Done. Tearing down server." + kill -INT $server_pid 2>/dev/null || true + sleep 3 + cleanup_server +} + +# ---------------- The actual sweep ---------------- + +# Config A: vanilla vLLM (no kvcached) +run_config "A_vanilla_inf" inf "" "" + +# Config B: kvcached default (fix branch) +run_config "B_kvcached_default_inf" inf \ + "ENABLE_KVCACHED=true KVCACHED_AUTOPATCH=1" "" + +# Config C: kvcached, CONTIGUOUS_LAYOUT=false +run_config "C_layout_false_inf" inf \ + "ENABLE_KVCACHED=true KVCACHED_AUTOPATCH=1 KVCACHED_CONTIGUOUS_LAYOUT=false" "" + +# Config D: kvcached, MAX_RESERVED_PAGES=200 +run_config "D_reserved200_inf" inf \ + "ENABLE_KVCACHED=true KVCACHED_AUTOPATCH=1 KVCACHED_MIN_RESERVED_PAGES=50 KVCACHED_MAX_RESERVED_PAGES=200" "" + +# Config E: kvcached, both knobs +run_config "E_both_inf" inf \ + "ENABLE_KVCACHED=true KVCACHED_AUTOPATCH=1 KVCACHED_CONTIGUOUS_LAYOUT=false KVCACHED_MIN_RESERVED_PAGES=50 KVCACHED_MAX_RESERVED_PAGES=200" "" + +# Config F: vanilla, request-rate=16 +run_config "F_vanilla_r16" 16 "" "" + +# Config G: kvcached default, request-rate=16 +run_config "G_kvcached_default_r16" 16 \ + "ENABLE_KVCACHED=true KVCACHED_AUTOPATCH=1" "" + +# Config H: kvcached best, request-rate=16 +run_config "H_best_r16" 16 \ + "ENABLE_KVCACHED=true KVCACHED_AUTOPATCH=1 KVCACHED_CONTIGUOUS_LAYOUT=false KVCACHED_MIN_RESERVED_PAGES=50 KVCACHED_MAX_RESERVED_PAGES=200" "" + +echo +echo "==========================================" +echo "All configs done. Results in $RESULTS_DIR" +echo "==========================================" diff --git a/csrc/allocator.cpp b/csrc/allocator.cpp index 8e52817b..9adb2715 100644 --- a/csrc/allocator.cpp +++ b/csrc/allocator.cpp @@ -65,7 +65,7 @@ void FTensorAllocator::init(const std::string &dev_str, size_t page_size, bool contiguous_layout) { std::lock_guard lock(g_allocator_mutex_); if (!g_allocators_.empty()) { - LOGE("FTensorAllocator has been initialized. Re-initializing...") + LOGGER(ERROR, "FTensorAllocator has been initialized. Re-initializing..."); g_allocators_.clear(); } @@ -74,8 +74,10 @@ void FTensorAllocator::init(const std::string &dev_str, size_t page_size, // Validate that page_size is a multiple of 2MB size_t base_size = 2 * 1024 * 1024; // 2MB if (page_size % base_size != 0) { - LOGE("Invalid page size: %zu, must be a multiple of 2MB (2097152 bytes)", - page_size); + LOGGER( + ERROR, + "Invalid page size: %zu, must be a multiple of 2MB (2097152 bytes)", + page_size); abort(); } kPageSize = page_size; @@ -120,8 +122,8 @@ std::vector FTensorAllocator::create_kv_tensors( size_t aligned_size = size; if (size % kPageSize != 0) { aligned_size = ((size + kPageSize - 1) / kPageSize) * kPageSize; - LOGW("Size %zu is not aligned to page size %zu, aligning to %zu", size, - kPageSize, aligned_size); + LOGGER(WARNING, "Size %zu is not aligned to page size %zu, aligning to %zu", + size, kPageSize, aligned_size); } kv_tensor_size_per_layer_ = aligned_size; @@ -151,7 +153,7 @@ bool FTensorAllocator::kv_tensors_created() { bool FTensorAllocator::map_to_kv_tensors(const std::vector &offsets) { std::unique_lock lock(mtx_); if (num_layers_ == 0) { - LOGE("try to map to KV tensors when KV tensors are not created"); + LOGGER(ERROR, "try to map to KV tensors when KV tensors are not created"); return false; } @@ -202,7 +204,8 @@ bool FTensorAllocator::unmap_from_kv_tensors( const std::vector &offsets) { std::unique_lock lock(mtx_); if (num_layers_ == 0) { - LOGE("try to unmap from KV tensors when KV tensors are not created"); + LOGGER(ERROR, + "try to unmap from KV tensors when KV tensors are not created"); return false; } diff --git a/csrc/ftensor.cpp b/csrc/ftensor.cpp index 10eba830..e4b4e76f 100644 --- a/csrc/ftensor.cpp +++ b/csrc/ftensor.cpp @@ -75,7 +75,7 @@ bool FTensor::map(offset_t offset) { page_id_t page_id = offset / page_size_; if (mapping_.find(page_id) != mapping_.end()) { - LOGE("Page %ld is already mapped.", page_id); + LOGGER(ERROR, "Page %ld is already mapped.", page_id); return false; } @@ -93,7 +93,7 @@ bool FTensor::unmap(offset_t offset) { page_id_t page_id = offset / page_size_; if (mapping_.find(page_id) == mapping_.end()) { - LOGE("Page %ld is not mapped.", page_id); + LOGGER(ERROR, "Page %ld is not mapped.", page_id); return false; } diff --git a/csrc/inc/cuda_utils.hpp b/csrc/inc/cuda_utils.hpp index 08db6954..5a984d57 100644 --- a/csrc/inc/cuda_utils.hpp +++ b/csrc/inc/cuda_utils.hpp @@ -8,6 +8,70 @@ #include #include +#include +#include +#include + +typedef enum { + FATAL = 0, + ERROR = 1, + WARNING = 2, + INFO = 3, + DEBUG = 4, + VERBOSE = 5, +} log_level_enum_t; + +extern void now_to_string(char *buf, int length); +#ifdef __cplusplus +__attribute__((unused)) static char *logger_level_str[] = { + (char *)"FATAL", (char *)"ERROR", (char *)"WARNING", + (char *)"INFO", (char *)"DEBUG", (char *)"VERBOSE"}; +#else +__attribute__((unused)) static char *logger_level_str[] = { + "FATAL", "ERROR", "WARNING", "INFO", "DEBUG", "VERBOSE"}; +#endif + +// glibc >= 2.30 provides a native gettid() wrapper; only define our own +// syscall-based version on older systems to avoid macro/function conflicts. +#if !defined(__GLIBC__) || !defined(__GLIBC_MINOR__) || (__GLIBC__ < 2) || \ + (__GLIBC__ == 2 && __GLIBC_MINOR__ < 30) +#ifndef SYS_gettid +#error "SYS_gettid unavailable on this system" +#endif +static inline pid_t gettid(void) { return (pid_t)syscall(SYS_gettid); } +#endif + +#define LOGGER(level, format, ...) \ + ({ \ + char *_print_level_str = getenv("KVCACHED_LOG_LEVEL"); \ + char time[64]; \ + now_to_string(time, 64); \ + int _print_level = 0; \ + if (_print_level_str == NULL) { \ + _print_level = WARNING; \ + } else if (_print_level_str[0] == 'F') { \ + _print_level = FATAL; \ + } else if (_print_level_str[0] == 'E') { \ + _print_level = ERROR; \ + } else if (_print_level_str[0] == 'W') { \ + _print_level = WARNING; \ + } else if (_print_level_str[0] == 'I') { \ + _print_level = INFO; \ + } else if (_print_level_str[0] == 'D') { \ + _print_level = DEBUG; \ + } else if (_print_level_str[0] == 'V') { \ + _print_level = VERBOSE; \ + } \ + if (level <= _print_level) { \ + fprintf(stderr, \ + "[KVCACHED_MEMORY_POOL][%s][%s]%s:%d [p:%u t:%u]" format "\n", \ + logger_level_str[level], time, __FILE__, __LINE__, \ + (unsigned int)getpid(), (unsigned int)gettid(), ##__VA_ARGS__); \ + } \ + if (level == FATAL) { \ + exit(-1); \ + } \ + }) #define LOGE(format, ...) \ fprintf(stderr, "ERROR: %s:%d: " format "\n", __FILE__, __LINE__, \ diff --git a/csrc/inc/mem_info_tracker.hpp b/csrc/inc/mem_info_tracker.hpp new file mode 100644 index 00000000..176d8040 --- /dev/null +++ b/csrc/inc/mem_info_tracker.hpp @@ -0,0 +1,246 @@ +// SPDX-FileCopyrightText: Copyright contributors to the kvcached project +// SPDX-License-Identifier: Apache-2.0 + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "cuda_utils.hpp" + +namespace kvcached { + +static constexpr const char *SHM_DIR = "/dev/shm"; + +// Memory info struct stored in shared memory, compatible with Python +// MemInfoStruct. Layout: [total_size(int64), used_size(int64), +// prealloc_size(int64)] +struct MemInfoStruct { + int64_t total_size; + int64_t used_size; + int64_t prealloc_size; + + static constexpr int N_FIELDS = 3; + static constexpr size_t SHM_SIZE = sizeof(int64_t) * N_FIELDS; + + MemInfoStruct() : total_size(0), used_size(0), prealloc_size(0) {} + MemInfoStruct(int64_t total, int64_t used, int64_t prealloc) + : total_size(total), used_size(used), prealloc_size(prealloc) {} +}; + +// RAII class for file-lock + mmap operations on /dev/shm files +class RwLockedShm { +public: + enum LockType { RLOCK = LOCK_SH, WLOCK = LOCK_EX }; + + RwLockedShm(const std::string &file_path, size_t size, LockType lock_type) + : file_path_(get_ipc_path(file_path)), size_(size), lock_type_(lock_type), + fd_(-1), mapped_(nullptr) {} + + ~RwLockedShm() { close(); } + + // Open and lock the shared memory file, returns whether successful + bool open() { + // Try to open the file + fd_ = ::open(file_path_.c_str(), O_RDWR); + if (fd_ < 0) { + if (lock_type_ != WLOCK) { + return false; + } + // Create file in write-lock mode + fd_ = ::open(file_path_.c_str(), O_RDWR | O_CREAT, 0666); + if (fd_ < 0) { + return false; + } + if (ftruncate(fd_, size_) < 0) { + ::close(fd_); + fd_ = -1; + return false; + } + } + + // Ensure file size is sufficient + struct stat st; + if (fstat(fd_, &st) == 0 && static_cast(st.st_size) < size_) { + if (lock_type_ == WLOCK) { + (void)ftruncate(fd_, size_); + } + } + + // Acquire file lock + if (flock(fd_, lock_type_) < 0) { + ::close(fd_); + fd_ = -1; + return false; + } + + // mmap + int prot = (lock_type_ == WLOCK) ? (PROT_READ | PROT_WRITE) : PROT_READ; + mapped_ = mmap(nullptr, size_, prot, MAP_SHARED, fd_, 0); + if (mapped_ == MAP_FAILED) { + mapped_ = nullptr; + flock(fd_, LOCK_UN); + ::close(fd_); + fd_ = -1; + return false; + } + + return true; + } + + void close() { + if (mapped_ != nullptr) { + munmap(mapped_, size_); + mapped_ = nullptr; + } + if (fd_ >= 0) { + flock(fd_, LOCK_UN); + ::close(fd_); + fd_ = -1; + } + } + + void *data() { return mapped_; } + const void *data() const { return mapped_; } + + // Read MemInfoStruct from mmap buffer + MemInfoStruct read_mem_info() const { + MemInfoStruct info; + if (mapped_) { + const int64_t *arr = static_cast(mapped_); + info.total_size = arr[0]; + info.used_size = arr[1]; + info.prealloc_size = arr[2]; + } + return info; + } + + // Write MemInfoStruct to mmap buffer + void write_mem_info(const MemInfoStruct &info) { + if (mapped_) { + int64_t *arr = static_cast(mapped_); + arr[0] = info.total_size; + arr[1] = info.used_size; + arr[2] = info.prealloc_size; + } + } + +private: + static std::string get_ipc_path(const std::string &name) { + if (name.empty()) + return ""; + if (name[0] == '/') + return name; + return std::string(SHM_DIR) + "/" + name; + } + + std::string file_path_; + size_t size_; + LockType lock_type_; + int fd_; + void *mapped_; +}; + +// MemInfoTracker: tracks memory usage info via POSIX shared memory +class MemInfoTracker { +public: + explicit MemInfoTracker(int64_t total_mem_size, int64_t group_id = 0, + const std::string &ipc_name = "") + : ipc_name_(ipc_name), total_mem_size_(total_mem_size) { + if (ipc_name_.empty()) { + std::string base = obtain_default_ipc_name(); + // Non-zero group_id gets a "_g" suffix so multiple pools + // in one process don't share a segment. + if (group_id != 0) { + base += "_g" + std::to_string(group_id); + } + ipc_name_ = base; + } + init_kv_cache_limit(total_mem_size_); + LOGGER(INFO, + "MemInfoTracker initialized: ipc_name=%s, total_mem_size=%ld, " + "group_id=%ld", + ipc_name_.c_str(), total_mem_size_, group_id); + } + + ~MemInfoTracker() { cleanup(); } + + // Update memory usage info in shared memory + void update_memory_usage(int64_t used_size, int64_t prealloc_size) { + RwLockedShm shm(ipc_name_, MemInfoStruct::SHM_SIZE, RwLockedShm::WLOCK); + if (!shm.open()) { + LOGGER(ERROR, "MemInfoTracker: failed to open shm for update: %s", + ipc_name_.c_str()); + return; + } + MemInfoStruct info = shm.read_mem_info(); + info.used_size = used_size; + info.prealloc_size = prealloc_size; + shm.write_mem_info(info); + } + + // Check if resize is needed, returns new mem_size (per layer), or -1 if not + // needed + int64_t check_and_get_resize_target(int64_t current_mem_size, + int64_t num_layers, + int64_t num_kv_buffers = 2) { + RwLockedShm shm(ipc_name_, MemInfoStruct::SHM_SIZE, RwLockedShm::RLOCK); + if (!shm.open()) { + return -1; + } + MemInfoStruct info = shm.read_mem_info(); + int64_t new_mem_size = info.total_size / num_layers / num_kv_buffers; + if (new_mem_size != current_mem_size) { + return new_mem_size; + } + return -1; + } + + const std::string &get_ipc_name() const { return ipc_name_; } + +private: + // Initialize kv cache limit in shared memory + void init_kv_cache_limit(int64_t kv_cache_limit) { + RwLockedShm shm(ipc_name_, MemInfoStruct::SHM_SIZE, RwLockedShm::WLOCK); + if (!shm.open()) { + LOGGER(ERROR, "MemInfoTracker: failed to create shm: %s", + ipc_name_.c_str()); + return; + } + MemInfoStruct info(kv_cache_limit, 0, 0); + shm.write_mem_info(info); + } + + // Cleanup shared memory + void cleanup() { + std::string path = std::string(SHM_DIR) + "/" + ipc_name_; + ::unlink(path.c_str()); + } + + // Get default IPC name (consistent with Python version logic) + static std::string obtain_default_ipc_name() { + // Prefer environment variable + const char *env_name = std::getenv("KVCACHED_IPC_NAME"); + if (env_name && env_name[0] != '\0') { + return std::string(env_name); + } + + // Construct name using pgid + pid_t pgid = getpgrp(); + char buf[256]; + snprintf(buf, sizeof(buf), "kvcached_engine_%d", static_cast(pgid)); + return std::string(buf); + } + + std::string ipc_name_; + int64_t total_mem_size_; +}; + +} // namespace kvcached diff --git a/csrc/inc/page_allocator.hpp b/csrc/inc/page_allocator.hpp new file mode 100644 index 00000000..6ffd03d5 --- /dev/null +++ b/csrc/inc/page_allocator.hpp @@ -0,0 +1,165 @@ +// SPDX-FileCopyrightText: Copyright contributors to the kvcached project +// SPDX-License-Identifier: Apache-2.0 + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "constants.hpp" +#include "mem_info_tracker.hpp" +#include "page.hpp" + +namespace kvcached { + +// Callback function types for multi-process support +using BroadcastMapCallback = + std::function &)>; +using BroadcastUnmapCallback = + std::function &)>; +using ShouldUseWorkerIpcCallback = std::function; + +// Independent InternalPage class +class InternalPage { +public: + page_id_t page_id; + int64_t page_size; + int64_t start_block; + int64_t end_block; + int64_t num_kv_blocks; + std::vector free_list; + + InternalPage(page_id_t id, int64_t size); + void init(int64_t block_mem_size); + std::vector alloc(int64_t num_blocks = 1); + void free(int64_t block_id); + void free_batch(const std::vector &block_ids); + bool empty() const; + bool full() const; + int64_t num_free_blocks() const; + std::vector get_free_blocks() const; + + static std::pair + get_block_range(page_id_t page_id, int64_t page_size, int64_t block_mem_size); + static int64_t get_num_blocks(int64_t page_size, int64_t block_mem_size); +}; + +class PageAllocator { +public: + PageAllocator(int64_t num_layers, int64_t mem_size_per_layer, + int64_t page_size, int64_t world_size = 1, int64_t pp_rank = 0, + bool async_sched = false, bool contiguous_layout = true, + bool enable_page_prealloc = true, int64_t num_kv_buffers = 2, + int64_t group_id = 0); + + ~PageAllocator(); + + // Page allocation and deallocation + std::shared_ptr alloc_page(); + void free_page(page_id_t page_id); + void free_pages(const std::vector &page_ids); + + // Memory management + bool resize(int64_t new_mem_size); + void trim(); + + // Status queries + int64_t get_num_free_pages() const; + int64_t get_num_inuse_pages() const; + int64_t get_num_total_pages() const; + int64_t get_num_reserved_pages() const; + int64_t get_avail_physical_pages() const; + + // Poll the shared-memory MemInfoStruct to see if an external controller + // (e.g. `kvctl limit`) has written a new total_size. Returns the new + // per-layer mem_size if it differs from current_mem_size, otherwise -1. + int64_t check_and_get_resize_target(int64_t current_mem_size) const; + + // Utility functions + page_id_t get_page_id(int64_t block_id, int64_t block_mem_size) const; + + // New method for efficient index grouping + std::unordered_map> + group_indices_by_page(const std::vector &indices, + int64_t block_mem_size) const; + + // Page list management + void reset_free_page_order(); + + // Thread management + void start_prealloc_thread(); + void stop_prealloc_thread(); + + // Callback function setters for multi-process support + void set_broadcast_map_callback(BroadcastMapCallback callback); + void set_broadcast_unmap_callback(BroadcastUnmapCallback callback); + void set_should_use_worker_ipc_callback(ShouldUseWorkerIpcCallback callback); + +private: + // Preallocation thread worker + void prealloc_worker(); + + // Internal methods + void map_pages(const std::vector &page_ids); + void unmap_pages(const std::vector &page_ids); + void update_memory_usage(); + void trigger_preallocation(); + void start_prealloc_thread_internal(); + void stop_prealloc_thread_internal(); + bool should_use_worker_ipc() const; + + // Configuration + int64_t num_layers_; + int64_t mem_size_per_layer_; + int64_t page_size_; + int64_t world_size_; + int64_t pp_rank_; + int64_t num_kv_buffers_; + int64_t group_id_; + bool async_sched_; + bool contiguous_layout_; + bool enable_page_prealloc_; + double gpu_utilization_; + + // Memory tracking + int64_t num_free_pages_; + int64_t num_total_pages_; + + // Page lists + std::deque free_page_list_; + std::deque reserved_page_list_; + std::deque reclaimed_page_list_; + + // Preallocation settings + int64_t min_reserved_pages_; + int64_t max_reserved_pages_; + + // Thread management + mutable std::mutex lock_; + std::condition_variable cond_; + std::atomic prealloc_running_; + std::atomic prealloc_needed_; + std::unique_ptr prealloc_thread_; + + // Memory info tracker + int64_t total_memory_size_; + std::unique_ptr mem_info_tracker_; + + // Callback functions for multi-process support + BroadcastMapCallback broadcast_map_callback_; + BroadcastUnmapCallback broadcast_unmap_callback_; + ShouldUseWorkerIpcCallback should_use_worker_ipc_callback_; +}; + +} // namespace kvcached diff --git a/csrc/page_allocator.cpp b/csrc/page_allocator.cpp new file mode 100644 index 00000000..410809aa --- /dev/null +++ b/csrc/page_allocator.cpp @@ -0,0 +1,749 @@ +// SPDX-FileCopyrightText: Copyright contributors to the kvcached project +// SPDX-License-Identifier: Apache-2.0 + +#include "page_allocator.hpp" + +#include +#include +#include +#include +#include +#include +#include + +#include "allocator.hpp" +#include "cuda_utils.hpp" +#include "mem_info_tracker.hpp" +#include "torch_utils.hpp" + +namespace kvcached { + +// Constants +constexpr double PREALLOC_THREAD_TIMEOUT = 2.0; // seconds + +// Environment variable based constants +const int64_t MIN_RESERVED_PAGES = []() { + const char *env_val = std::getenv("KVCACHED_MIN_RESERVED_PAGES"); + return env_val ? std::atoi(env_val) : 5; +}(); + +const int64_t MAX_RESERVED_PAGES = []() { + const char *env_val = std::getenv("KVCACHED_MAX_RESERVED_PAGES"); + return env_val ? std::atoi(env_val) : 10; +}(); + +const double GPU_UTILIZATION = []() { + const char *env_val = std::getenv("KVCACHED_GPU_UTILIZATION"); + return env_val ? std::atof(env_val) : 0.95; +}(); + +// InternalPage implementation +InternalPage::InternalPage(page_id_t id, int64_t size) + : page_id(id), page_size(size), start_block(0), end_block(0), + num_kv_blocks(0) {} + +void InternalPage::init(int64_t block_mem_size) { + auto range = get_block_range(page_id, page_size, block_mem_size); + start_block = range.first; + end_block = range.second; + num_kv_blocks = end_block - start_block; + free_list.clear(); + for (int64_t i = start_block; i < end_block; ++i) { + free_list.push_back(i); + } +} + +std::vector InternalPage::alloc(int64_t num_blocks) { + if (free_list.size() < static_cast(num_blocks)) { + throw std::runtime_error("Not enough free blocks in page"); + } + + std::vector block_ids; + block_ids.reserve(num_blocks); + for (int64_t i = 0; i < num_blocks; ++i) { + block_ids.push_back(free_list[i]); + } + free_list.erase(free_list.begin(), free_list.begin() + num_blocks); + return block_ids; +} + +void InternalPage::free(int64_t block_id) { free_list.push_back(block_id); } + +void InternalPage::free_batch(const std::vector &block_ids) { + free_list.insert(free_list.end(), block_ids.begin(), block_ids.end()); +} + +bool InternalPage::empty() const { + return free_list.size() == static_cast(num_kv_blocks); +} + +bool InternalPage::full() const { return free_list.empty(); } + +int64_t InternalPage::num_free_blocks() const { + return static_cast(free_list.size()); +} + +std::vector InternalPage::get_free_blocks() const { return free_list; } + +std::pair +InternalPage::get_block_range(page_id_t page_id, int64_t page_size, + int64_t block_mem_size) { + + int64_t start_block = + (page_id * page_size + block_mem_size - 1) / block_mem_size; + int64_t end_block = ((page_id + 1) * page_size) / block_mem_size; + return {start_block, end_block}; +} + +int64_t InternalPage::get_num_blocks(int64_t page_size, + int64_t block_mem_size) { + return page_size / block_mem_size; +} + +// PageAllocator implementation +PageAllocator::PageAllocator(int64_t num_layers, int64_t mem_size_per_layer, + int64_t page_size, int64_t world_size, + int64_t pp_rank, bool async_sched, + bool contiguous_layout, bool enable_page_prealloc, + int64_t num_kv_buffers, int64_t group_id) + : num_layers_(num_layers), mem_size_per_layer_(mem_size_per_layer), + page_size_(page_size), world_size_(world_size), pp_rank_(pp_rank), + num_kv_buffers_(num_kv_buffers), group_id_(group_id), + async_sched_(async_sched), contiguous_layout_(contiguous_layout), + enable_page_prealloc_(enable_page_prealloc), + gpu_utilization_(GPU_UTILIZATION), + num_free_pages_(mem_size_per_layer / page_size), + num_total_pages_(mem_size_per_layer / page_size), + min_reserved_pages_(std::min(num_free_pages_, MIN_RESERVED_PAGES)), + max_reserved_pages_(std::min(num_free_pages_, MAX_RESERVED_PAGES)), + prealloc_running_(false), prealloc_needed_(false), + total_memory_size_(mem_size_per_layer * num_layers * num_kv_buffers) { + + // Initialize free page list + for (int64_t i = 0; i < num_free_pages_; ++i) { + free_page_list_.push_back(i); + } + + // Initialize memory info tracker + mem_info_tracker_ = + std::make_unique(total_memory_size_, group_id_); + + std::cout << "Init C++ PageAllocator: " + << "num_layers=" << num_layers << ", " + << "mem_size_per_layer=" << mem_size_per_layer / (1024 * 1024) + << "MB, " + << "total_mem_size=" + << (num_kv_buffers * num_layers * mem_size_per_layer) / + (1024 * 1024) + << "MB, " + << "page_size=" << page_size / (1024 * 1024) << "MB, " + << "world_size=" << world_size << ", " + << "pp_rank=" << pp_rank << ", " + << "async_sched=" << async_sched << ", " + << "contiguous_layout=" << contiguous_layout << ", " + << "enable_prealloc=" << enable_page_prealloc << ", " + << "num_kv_buffers=" << num_kv_buffers << ", " + << "group_id=" << group_id << ", " + << "min_reserved_pages=" << min_reserved_pages_ << ", " + << "max_reserved_pages=" << max_reserved_pages_ << std::endl; +} + +PageAllocator::~PageAllocator() { + try { + if (enable_page_prealloc_ && prealloc_thread_) { + stop_prealloc_thread_internal(); + } + } catch (...) { + // Silently ignore exceptions during cleanup + } +} + +std::shared_ptr PageAllocator::alloc_page() { + auto start_time = std::chrono::steady_clock::now(); + + std::unique_lock lock(lock_); + page_id_t page_id = -1; + + while (page_id == -1) { + // Fast path: allocate from reserved pages + if (!reserved_page_list_.empty()) { + page_id = reserved_page_list_.front(); + reserved_page_list_.pop_front(); + num_free_pages_--; + + // Trigger preallocation to refill reserved pool if getting low + if (reserved_page_list_.size() < + static_cast(min_reserved_pages_)) { + prealloc_needed_ = true; + cond_.notify_all(); + } + + update_memory_usage(); + auto end_time = std::chrono::steady_clock::now(); + auto duration = std::chrono::duration_cast( + end_time - start_time); + LOGGER(DEBUG, "alloc 1 page fast path cost %lu us", duration.count()); + // std::cout << "alloc 1 page fast path cost " << duration.count() << " + // us" << std::endl; + + return std::make_shared(page_id, page_size_); + } + + // Slow path: allocate from free pages + if (!free_page_list_.empty()) { + page_id = free_page_list_.front(); + free_page_list_.pop_front(); + num_free_pages_--; + break; + } + + if (num_free_pages_ <= 0) { + throw std::runtime_error("No free pages left"); + } + + if (!enable_page_prealloc_) { + throw std::runtime_error( + "Inconsistent page allocator state: no free pages available"); + } + + // Wait for background preallocation + cond_.wait(lock); + } + + lock.unlock(); + + try { + map_pages({page_id}); + } catch (const std::exception &e) { + std::lock_guard guard(lock_); + free_page_list_.push_front(page_id); + num_free_pages_++; + cond_.notify_all(); + throw std::runtime_error("Failed to map page " + std::to_string(page_id) + + ": " + e.what()); + } + + if (enable_page_prealloc_) { + trigger_preallocation(); + } + + update_memory_usage(); + auto end_time = std::chrono::steady_clock::now(); + auto duration = std::chrono::duration_cast( + end_time - start_time); + LOGGER(DEBUG, "alloc 1 page slow path cost %lu us", duration.count()); + + return std::make_shared(page_id, page_size_); +} + +void PageAllocator::free_page(page_id_t page_id) { + { + std::lock_guard lock(lock_); + num_free_pages_++; + + if (reserved_page_list_.size() < static_cast(max_reserved_pages_)) { + // Fast path: reserve page + reserved_page_list_.push_back(page_id); + update_memory_usage(); + cond_.notify_all(); + return; + } + } + + // Slow path: free page and unmap (lock released, exception-safe) + unmap_pages({page_id}); + + { + std::lock_guard lock(lock_); + free_page_list_.push_back(page_id); + update_memory_usage(); + cond_.notify_all(); + } +} + +void PageAllocator::free_pages(const std::vector &page_ids) { + auto start_time = std::chrono::steady_clock::now(); + + std::vector pages_to_unmap; + + { + std::lock_guard lock(lock_); + num_free_pages_ += page_ids.size(); + int64_t num_to_reserve = max_reserved_pages_ - reserved_page_list_.size(); + + if (num_to_reserve > 0) { + // Fast path: reserve pages + auto reserve_end = + page_ids.begin() + + std::min(static_cast(num_to_reserve), page_ids.size()); + reserved_page_list_.insert(reserved_page_list_.end(), page_ids.begin(), + reserve_end); + + pages_to_unmap.assign(reserve_end, page_ids.end()); + + if (pages_to_unmap.empty()) { + update_memory_usage(); + cond_.notify_all(); + return; + } + } else { + pages_to_unmap = page_ids; + } + } + + // Slow path: unmap pages (lock released, exception-safe) + unmap_pages(pages_to_unmap); + + { + std::lock_guard lock(lock_); + free_page_list_.insert(free_page_list_.end(), pages_to_unmap.begin(), + pages_to_unmap.end()); + update_memory_usage(); + cond_.notify_all(); + } + + auto end_time = std::chrono::steady_clock::now(); + auto duration = std::chrono::duration_cast( + end_time - start_time); + LOGGER(DEBUG, "free %ld pages cost %lu us", page_ids.size(), + duration.count()); +} + +bool PageAllocator::resize(int64_t new_mem_size) { + int64_t new_num_pages = new_mem_size / page_size_; + + std::vector pages_to_unmap; + + { + std::lock_guard lock(lock_); + + if (new_num_pages < get_num_inuse_pages()) { + return false; + } + + if (new_num_pages == num_total_pages_) { + return true; + } else if (new_num_pages > num_total_pages_) { + int64_t num_to_expand = new_num_pages - num_total_pages_; + + // Reuse previously reclaimed pages first + int64_t num_to_reuse = std::min( + static_cast(reclaimed_page_list_.size()), num_to_expand); + if (num_to_reuse > 0) { + for (int64_t i = 0; i < num_to_reuse; ++i) { + free_page_list_.push_back(reclaimed_page_list_.front()); + reclaimed_page_list_.pop_front(); + } + num_to_expand -= num_to_reuse; + num_free_pages_ += num_to_reuse; + } + + // Allocate new pages if needed + if (num_to_expand > 0) { + for (int64_t i = num_total_pages_; i < num_total_pages_ + num_to_expand; + ++i) { + free_page_list_.push_back(i); + } + num_free_pages_ += num_to_expand; + } + num_total_pages_ = new_num_pages; + update_memory_usage(); + return true; + } else { + // Shrink path + int64_t num_to_reclaim = num_total_pages_ - new_num_pages; + + if (free_page_list_.size() < static_cast(num_to_reclaim)) { + // Need to trim reserved pages first + if (!reserved_page_list_.empty()) { + pages_to_unmap.assign(reserved_page_list_.begin(), + reserved_page_list_.end()); + reserved_page_list_.clear(); + } else { + return false; + } + } else { + // Enough free pages, reclaim directly + for (int64_t i = 0; i < num_to_reclaim; ++i) { + reclaimed_page_list_.push_back(free_page_list_.back()); + free_page_list_.pop_back(); + } + num_free_pages_ -= num_to_reclaim; + num_total_pages_ = new_num_pages; + return true; + } + } + } + + // Unmap pages outside the lock (exception-safe) + unmap_pages(pages_to_unmap); + + { + std::lock_guard lock(lock_); + int64_t num_to_reclaim = num_total_pages_ - new_num_pages; + + free_page_list_.insert(free_page_list_.end(), pages_to_unmap.begin(), + pages_to_unmap.end()); + update_memory_usage(); + + if (free_page_list_.size() < static_cast(num_to_reclaim)) { + return false; + } + + for (int64_t i = 0; i < num_to_reclaim; ++i) { + reclaimed_page_list_.push_back(free_page_list_.back()); + free_page_list_.pop_back(); + } + num_free_pages_ -= num_to_reclaim; + num_total_pages_ = new_num_pages; + } + return true; +} + +void PageAllocator::trim() { + std::vector pages_to_unmap; + + { + std::lock_guard lock(lock_); + pages_to_unmap.assign(reserved_page_list_.begin(), + reserved_page_list_.end()); + reserved_page_list_.clear(); + + if (pages_to_unmap.empty()) { + update_memory_usage(); + return; + } + } + + // Unmap pages outside the lock (exception-safe) + unmap_pages(pages_to_unmap); + + { + std::lock_guard lock(lock_); + free_page_list_.insert(free_page_list_.end(), pages_to_unmap.begin(), + pages_to_unmap.end()); + update_memory_usage(); + } +} + +int64_t PageAllocator::get_num_free_pages() const { return num_free_pages_; } + +int64_t PageAllocator::get_num_inuse_pages() const { + return num_total_pages_ - num_free_pages_; +} + +int64_t PageAllocator::get_num_total_pages() const { return num_total_pages_; } + +int64_t PageAllocator::get_num_reserved_pages() const { + std::lock_guard lock(lock_); + return reserved_page_list_.size(); +} + +int64_t PageAllocator::get_avail_physical_pages() const { + size_t avail_phy_mem_size, total_phy_mem_size; + cudaMemGetInfo(&avail_phy_mem_size, &total_phy_mem_size); + + size_t headroom = total_phy_mem_size * (1.0 - gpu_utilization_); + avail_phy_mem_size = + std::max(avail_phy_mem_size - headroom, static_cast(0)); + + // Calculate available pages considering layers and KV buffers + int64_t avail_phy_pages = avail_phy_mem_size / page_size_; + int64_t avail_pages_per_layer = + avail_phy_pages / num_layers_ / num_kv_buffers_; + return avail_pages_per_layer; +} + +page_id_t PageAllocator::get_page_id(int64_t block_id, + int64_t block_mem_size) const { + return block_id * block_mem_size / page_size_; +} + +int64_t +PageAllocator::check_and_get_resize_target(int64_t current_mem_size) const { + if (!mem_info_tracker_) { + return -1; + } + return mem_info_tracker_->check_and_get_resize_target( + current_mem_size, num_layers_, num_kv_buffers_); +} + +std::unordered_map> +PageAllocator::group_indices_by_page(const std::vector &indices, + int64_t block_mem_size) const { + + auto start_time = std::chrono::steady_clock::now(); + + std::unordered_map> result; + + // Pre-calculate constants for efficiency + int64_t blocks_per_page = page_size_ / block_mem_size; + + // Reserve space for efficiency + result.reserve(indices.size() / blocks_per_page + 1); + + // Group indices by page_id + for (int64_t idx : indices) { + page_id_t page_id = get_page_id(idx, block_mem_size); + result[page_id].push_back(idx); + } + + auto end_time = std::chrono::steady_clock::now(); + auto duration = std::chrono::duration_cast( + end_time - start_time); + LOGGER(DEBUG, "C++ group_indices_by_page processed %zu indices in %ld us", + indices.size(), duration.count()); + + return result; +} + +// Callback function setters +void PageAllocator::set_broadcast_map_callback(BroadcastMapCallback callback) { + std::lock_guard lock(lock_); + broadcast_map_callback_ = callback; + LOGGER(INFO, "Broadcast map callback set for PageAllocator (world_size=%ld)", + world_size_); +} + +void PageAllocator::set_broadcast_unmap_callback( + BroadcastUnmapCallback callback) { + std::lock_guard lock(lock_); + broadcast_unmap_callback_ = callback; + LOGGER(INFO, + "Broadcast unmap callback set for PageAllocator (world_size=%ld)", + world_size_); +} + +void PageAllocator::set_should_use_worker_ipc_callback( + ShouldUseWorkerIpcCallback callback) { + std::lock_guard lock(lock_); + should_use_worker_ipc_callback_ = callback; + LOGGER(INFO, "Should-use-worker-ipc callback set for PageAllocator"); +} + +void PageAllocator::start_prealloc_thread() { + if (enable_page_prealloc_) { + start_prealloc_thread_internal(); + } +} + +void PageAllocator::stop_prealloc_thread() { + if (enable_page_prealloc_) { + stop_prealloc_thread_internal(); + } +} + +void PageAllocator::prealloc_worker() { + auto start_time = std::chrono::steady_clock::now(); + + while (prealloc_running_) { + std::unique_lock lock(lock_); + + // Wait until preallocation is needed or thread is stopped + while (!prealloc_needed_ && prealloc_running_) { + cond_.wait(lock); + } + + LOGGER(INFO, "prealloc worker triggered..."); + if (!prealloc_running_) { + break; + } + + start_time = std::chrono::steady_clock::now(); + prealloc_needed_ = false; + + int64_t current_reserved = reserved_page_list_.size(); + int64_t to_reserve = std::max(0L, min_reserved_pages_ - current_reserved); + // Only try to reserve up to the available free pages and physical memory + to_reserve = + std::min({to_reserve, static_cast(free_page_list_.size()), + get_avail_physical_pages()}); + + LOGGER(INFO, + "max_reserved_pages: %ld, min_reserved_pages: %ld, " + "current_reserved: %ld, to_reserve: %ld, len(free_page_list): %zu", + max_reserved_pages_, min_reserved_pages_, current_reserved, + to_reserve, free_page_list_.size()); + + if (to_reserve <= 0) { + auto end_time = std::chrono::steady_clock::now(); + auto duration = std::chrono::duration_cast( + end_time - start_time); + LOGGER(INFO, + "prealloc cost: %ld us, no need to preallocate(to_reserve: %ld).", + duration.count(), to_reserve); + continue; + } + + std::vector pages_to_reserve; + pages_to_reserve.reserve(to_reserve); + + // Get pages from free list + for (int64_t i = 0; i < to_reserve && !free_page_list_.empty(); ++i) { + pages_to_reserve.push_back(free_page_list_.front()); + free_page_list_.pop_front(); + } + + lock.unlock(); + + if (!pages_to_reserve.empty()) { + try { + map_pages(pages_to_reserve); + lock.lock(); + reserved_page_list_.insert(reserved_page_list_.end(), + pages_to_reserve.begin(), + pages_to_reserve.end()); + update_memory_usage(); + cond_.notify_all(); + LOGGER(INFO, "Preallocated %ld pages, reserved=%ld", + pages_to_reserve.size(), reserved_page_list_.size()); + } catch (const std::exception &e) { + lock.lock(); + free_page_list_.insert(free_page_list_.begin(), + pages_to_reserve.begin(), + pages_to_reserve.end()); + cond_.notify_all(); + LOGGER(ERROR, "Failed to preallocate %ld pages: %s", + pages_to_reserve.size(), e.what()); + } + + auto end_time = std::chrono::steady_clock::now(); + auto duration = std::chrono::duration_cast( + end_time - start_time); + LOGGER(INFO, "prealloc cost: %ld us, prealloc %ld pages.", + duration.count(), pages_to_reserve.size()); + } + } +} + +void PageAllocator::map_pages(const std::vector &page_ids) { + std::vector offsets; + offsets.reserve(page_ids.size()); + + if (contiguous_layout_) { + for (page_id_t pid : page_ids) { + offsets.push_back(pid * page_size_ * num_layers_ * num_kv_buffers_); + } + } else { + for (page_id_t pid : page_ids) { + offsets.push_back(pid * page_size_); + } + } + + if ((world_size_ > 1 || should_use_worker_ipc()) && broadcast_map_callback_) { + // Multi-process mode: execute map on all TP workers via broadcast callback + broadcast_map_callback_(world_size_, offsets); + } else { + // Single-process mode: directly call FTensorAllocator + auto allocator = FTensorAllocator::global_allocator(group_id_); + bool success = allocator->map_to_kv_tensors(offsets); + if (!success) { + throw std::runtime_error("Failed to map pages to KV tensors"); + } + } + + LOGGER(INFO, "Mapped %zu pages to KV tensors", page_ids.size()); +} + +void PageAllocator::unmap_pages(const std::vector &page_ids) { + auto start_time = std::chrono::steady_clock::now(); + + std::vector offsets; + offsets.reserve(page_ids.size()); + + if (contiguous_layout_) { + for (page_id_t pid : page_ids) { + offsets.push_back(pid * page_size_ * num_layers_ * num_kv_buffers_); + } + } else { + for (page_id_t pid : page_ids) { + offsets.push_back(pid * page_size_); + } + } + + if ((world_size_ > 1 || should_use_worker_ipc()) && + broadcast_unmap_callback_) { + // Multi-process mode: execute unmap on all TP workers via broadcast + // callback + broadcast_unmap_callback_(world_size_, offsets); + } else { + // Need to synchronize CUDA first in async scheduling mode + if (async_sched_) { + CHECK_RT(cudaDeviceSynchronize()); + } + auto allocator = FTensorAllocator::global_allocator(group_id_); + bool success = allocator->unmap_from_kv_tensors(offsets); + if (!success) { + throw std::runtime_error("Failed to unmap pages from KV tensors"); + } + } + + auto end_time = std::chrono::steady_clock::now(); + auto duration = std::chrono::duration_cast( + end_time - start_time); + LOGGER(INFO, "Unmapped %zu pages from KV tensors, cost: %lu us", + page_ids.size(), duration.count()); +} + +void PageAllocator::update_memory_usage() { + // Calculate currently used physical memory (excluding preallocated pages) + int64_t used_phy_mem_size = + get_num_inuse_pages() * num_layers_ * page_size_ * num_kv_buffers_; + // Calculate physical memory occupied by preallocated pages + int64_t prealloc_phy_mem_size = + static_cast(reserved_page_list_.size()) * num_layers_ * + page_size_ * num_kv_buffers_; + + if (mem_info_tracker_) { + mem_info_tracker_->update_memory_usage(used_phy_mem_size, + prealloc_phy_mem_size); + } +} + +void PageAllocator::reset_free_page_order() { + std::lock_guard lock(lock_); + std::vector sorted_pages(free_page_list_.begin(), + free_page_list_.end()); + std::sort(sorted_pages.begin(), sorted_pages.end()); + free_page_list_.assign(sorted_pages.begin(), sorted_pages.end()); +} + +void PageAllocator::trigger_preallocation() { + std::lock_guard lock(lock_); + prealloc_needed_ = true; + cond_.notify_all(); +} + +void PageAllocator::start_prealloc_thread_internal() { + if (!prealloc_thread_) { + prealloc_running_ = true; + prealloc_thread_ = + std::make_unique(&PageAllocator::prealloc_worker, this); + + // Initial preallocation trigger + trigger_preallocation(); + } +} + +void PageAllocator::stop_prealloc_thread_internal() { + if (prealloc_thread_) { + { + std::lock_guard lock(lock_); + prealloc_running_ = false; + cond_.notify_all(); + } + + prealloc_thread_->join(); + prealloc_thread_.reset(); + LOGGER(DEBUG, "Stopped page preallocation thread"); + } +} + +bool PageAllocator::should_use_worker_ipc() const { + if (should_use_worker_ipc_callback_) { + return should_use_worker_ipc_callback_(); + } + return false; +} + +} // namespace kvcached \ No newline at end of file diff --git a/csrc/torch_bindings.cpp b/csrc/torch_bindings.cpp index 28830555..d4b4e987 100644 --- a/csrc/torch_bindings.cpp +++ b/csrc/torch_bindings.cpp @@ -1,13 +1,17 @@ // SPDX-FileCopyrightText: Copyright contributors to the kvcached project // SPDX-License-Identifier: Apache-2.0 +#include +#include #include +#include #include #include #include #include "allocator.hpp" #include "constants.hpp" +#include "page_allocator.hpp" #include "torch_utils.hpp" namespace kvcached { @@ -54,6 +58,118 @@ bool unmap_from_kv_tensors(const std::vector &offsets, return allocator->unmap_from_kv_tensors(offsets); } +// PageAllocator bindings +std::shared_ptr create_page_allocator( + int64_t num_layers, int64_t mem_size_per_layer, int64_t page_size, + int64_t world_size = 1, int64_t pp_rank = 0, bool async_sched = false, + bool contiguous_layout = true, bool enable_page_prealloc = true, + int64_t num_kv_buffers = 2, int64_t group_id = 0) { + + return std::make_shared( + num_layers, mem_size_per_layer, page_size, world_size, pp_rank, + async_sched, contiguous_layout, enable_page_prealloc, num_kv_buffers, + group_id); +} + +// PageAllocator method bindings +void page_allocator_start_prealloc_thread( + std::shared_ptr allocator) { + allocator->start_prealloc_thread(); +} + +void page_allocator_stop_prealloc_thread( + std::shared_ptr allocator) { + allocator->stop_prealloc_thread(); +} + +std::shared_ptr +page_allocator_alloc_page(std::shared_ptr allocator) { + return allocator->alloc_page(); +} + +void page_allocator_free_page(std::shared_ptr allocator, + page_id_t page_id) { + allocator->free_page(page_id); +} + +void page_allocator_free_pages(std::shared_ptr allocator, + const std::vector &page_ids) { + allocator->free_pages(page_ids); +} + +bool page_allocator_resize(std::shared_ptr allocator, + int64_t new_mem_size) { + return allocator->resize(new_mem_size); +} + +void page_allocator_trim(std::shared_ptr allocator) { + allocator->trim(); +} + +void page_allocator_reset_free_page_order( + std::shared_ptr allocator) { + allocator->reset_free_page_order(); +} + +int64_t +page_allocator_get_num_free_pages(std::shared_ptr allocator) { + return allocator->get_num_free_pages(); +} + +int64_t +page_allocator_get_num_inuse_pages(std::shared_ptr allocator) { + return allocator->get_num_inuse_pages(); +} + +int64_t +page_allocator_get_num_total_pages(std::shared_ptr allocator) { + return allocator->get_num_total_pages(); +} + +int64_t page_allocator_get_num_reserved_pages( + std::shared_ptr allocator) { + return allocator->get_num_reserved_pages(); +} + +int64_t page_allocator_get_avail_physical_pages( + std::shared_ptr allocator) { + return allocator->get_avail_physical_pages(); +} + +int64_t page_allocator_check_and_get_resize_target( + std::shared_ptr allocator, int64_t current_mem_size) { + return allocator->check_and_get_resize_target(current_mem_size); +} + +void page_allocator_set_broadcast_map_callback( + std::shared_ptr allocator, BroadcastMapCallback callback) { + allocator->set_broadcast_map_callback(callback); +} + +void page_allocator_set_broadcast_unmap_callback( + std::shared_ptr allocator, BroadcastUnmapCallback callback) { + allocator->set_broadcast_unmap_callback(callback); +} + +void page_allocator_set_should_use_worker_ipc_callback( + std::shared_ptr allocator, + ShouldUseWorkerIpcCallback callback) { + allocator->set_should_use_worker_ipc_callback(callback); +} + +page_id_t page_allocator_get_page_id(std::shared_ptr allocator, + int64_t block_id, int64_t block_mem_size) { + return allocator->get_page_id(block_id, block_mem_size); +} + +// New function for grouping indices by page +std::unordered_map> +page_allocator_group_indices_by_page(std::shared_ptr allocator, + const std::vector &indices, + int64_t block_mem_size) { + return allocator->group_indices_by_page(indices, block_mem_size); +} + } // namespace kvcached PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) { @@ -73,4 +189,61 @@ PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) { py::arg("offsets"), py::arg("group_id") = 0); m.def("unmap_from_kv_tensors", &kvcached::unmap_from_kv_tensors, "unmap_from_kv_tensors", py::arg("offsets"), py::arg("group_id") = 0); + + // PageAllocator bindings + py::class_>( + m, "PageAllocator") + .def(py::init(&kvcached::create_page_allocator), py::arg("num_layers"), + py::arg("mem_size_per_layer"), py::arg("page_size"), + py::arg("world_size") = 1, py::arg("pp_rank") = 0, + py::arg("async_sched") = false, py::arg("contiguous_layout") = true, + py::arg("enable_page_prealloc") = true, + py::arg("num_kv_buffers") = 2, py::arg("group_id") = 0) + .def("start_prealloc_thread", + &kvcached::page_allocator_start_prealloc_thread) + .def("stop_prealloc_thread", + &kvcached::page_allocator_stop_prealloc_thread) + .def("alloc_page", &kvcached::page_allocator_alloc_page) + .def("free_page", &kvcached::page_allocator_free_page) + .def("free_pages", &kvcached::page_allocator_free_pages) + .def("resize", &kvcached::page_allocator_resize) + .def("trim", &kvcached::page_allocator_trim) + .def("reset_free_page_order", + &kvcached::page_allocator_reset_free_page_order) + .def("get_num_free_pages", &kvcached::page_allocator_get_num_free_pages) + .def("get_num_inuse_pages", &kvcached::page_allocator_get_num_inuse_pages) + .def("get_num_total_pages", &kvcached::page_allocator_get_num_total_pages) + .def("get_num_reserved_pages", + &kvcached::page_allocator_get_num_reserved_pages) + .def("get_avail_physical_pages", + &kvcached::page_allocator_get_avail_physical_pages) + .def("check_and_get_resize_target", + &kvcached::page_allocator_check_and_get_resize_target) + .def("get_page_id", &kvcached::page_allocator_get_page_id) + .def("group_indices_by_page", + &kvcached::page_allocator_group_indices_by_page) + .def("set_broadcast_map_callback", + &kvcached::page_allocator_set_broadcast_map_callback) + .def("set_broadcast_unmap_callback", + &kvcached::page_allocator_set_broadcast_unmap_callback) + .def("set_should_use_worker_ipc_callback", + &kvcached::page_allocator_set_should_use_worker_ipc_callback); + + // InternalPage bindings (now as independent class) + py::class_>( + m, "InternalPage") + .def(py::init(), py::arg("page_id"), + py::arg("page_size")) + .def_readonly("page_id", &kvcached::InternalPage::page_id) + .def_readonly("page_size", &kvcached::InternalPage::page_size) + .def("init", &kvcached::InternalPage::init) + .def("alloc", &kvcached::InternalPage::alloc) + .def("free", &kvcached::InternalPage::free) + .def("free_batch", &kvcached::InternalPage::free_batch) + .def("empty", &kvcached::InternalPage::empty) + .def("full", &kvcached::InternalPage::full) + .def("num_free_blocks", &kvcached::InternalPage::num_free_blocks) + .def("get_free_blocks", &kvcached::InternalPage::get_free_blocks) + .def_static("get_block_range", &kvcached::InternalPage::get_block_range) + .def_static("get_num_blocks", &kvcached::InternalPage::get_num_blocks); } diff --git a/csrc/util.cpp b/csrc/util.cpp new file mode 100644 index 00000000..57fceb4b --- /dev/null +++ b/csrc/util.cpp @@ -0,0 +1,41 @@ +// SPDX-FileCopyrightText: Copyright contributors to the kvcached project +// SPDX-License-Identifier: Apache-2.0 + +#include +#include +#include +#include + +#define NSEC_PER_USEC (1000ULL) +#define USEC_PER_SEC (1000000ULL) + +uint64_t timespec_to_us(struct timespec ts) { + return (ts.tv_sec * USEC_PER_SEC + ts.tv_nsec / NSEC_PER_USEC); +} + +uint64_t get_current_timestamp_in_us() { + struct timespec ts; + clock_gettime(CLOCK_MONOTONIC, &ts); + return timespec_to_us(ts); +} + +void now_to_string(char *buf, int length) { + auto now = std::chrono::system_clock::now(); + auto seconds = std::chrono::system_clock::to_time_t(now); + auto us = std::chrono::duration_cast( + now.time_since_epoch()) % + 1000000; + + std::tm tm_struct = {}; + +#ifdef _WIN32 + localtime_s(&tm_struct, &seconds); +#else + localtime_r(&seconds, &tm_struct); +#endif + + char buffer[64]; + std::strftime(buffer, sizeof(buffer), "%Y-%m-%d %H:%M:%S", &tm_struct); + + std::snprintf(buf, length, "%s.%06ld", buffer, static_cast(us.count())); +} \ No newline at end of file diff --git a/kvcached/integration/vllm/patches.py b/kvcached/integration/vllm/patches.py index e97c003e..a87e47e3 100644 --- a/kvcached/integration/vllm/patches.py +++ b/kvcached/integration/vllm/patches.py @@ -308,6 +308,7 @@ def __init__( self.num_gpu_blocks = num_gpu_blocks self.enable_kv_cache_events = enable_kv_cache_events self.kv_event_queue = [] # type: ignore[var-annotated] + self.kv_block_pool = [KVCacheBlockClass(i) for i in range(num_gpu_blocks)] from kvcached.integration.vllm.interfaces import get_kv_cache_manager @@ -481,7 +482,12 @@ def get_new_blocks( block_ids = self.kv_cache_manager.alloc(num_blocks) assert block_ids is not None and len(block_ids) == num_blocks - return [KVCacheBlockClass(bid, ref_cnt=1) for bid in block_ids] + blocks = [] + for bid in block_ids: + block = self.kv_block_pool[bid] + block.ref_cnt = 1 + blocks.append(block) + return blocks def touch( self, blocks: list["KVCacheBlock"] | tuple[list["KVCacheBlock"], ...] @@ -1208,7 +1214,10 @@ def _reshape_kv_cache_tensors_from_kvcached( self, kv_cache_config, kv_cache_raw_tensors, *args: Any, **kwargs: Any ): import torch - from vllm.utils.torch_utils import get_dtype_size + try: + from vllm.utils.torch_utils import get_dtype_size + except ImportError: + from vllm.utils import get_dtype_size # type: ignore[attr-defined] kv_caches: dict[str, torch.Tensor] = {} diff --git a/kvcached/kv_cache_manager.py b/kvcached/kv_cache_manager.py index f22b4018..f3dc41c6 100644 --- a/kvcached/kv_cache_manager.py +++ b/kvcached/kv_cache_manager.py @@ -9,18 +9,37 @@ - Blocks: Smaller units within pages that are allocated to store KV cache data """ +from __future__ import annotations + import functools +import os import threading import time -from collections import defaultdict -from typing import Dict, List, Optional +from typing import Any, Dict, List, Optional from kvcached.locks import NoOpLock -from kvcached.page_allocator import Page, PageAllocator from kvcached.tp_ipc_util import broadcast_kv_tensors_created -from kvcached.utils import PAGE_SIZE, SANITY_CHECK, get_kvcached_logger +from kvcached.utils import ( + CONTIGUOUS_LAYOUT, + DEFAULT_IPC_NAME, + PAGE_SIZE, + SANITY_CHECK, + get_kvcached_logger, +) from kvcached.vmm_ops import kv_tensors_created +# The C++ MemInfoTracker derives its shm name from KVCACHED_IPC_NAME or +# falls back to "kvcached_engine_", which does NOT match Python's +# "kvcached__". Pin the env var so both sides agree. +os.environ.setdefault("KVCACHED_IPC_NAME", DEFAULT_IPC_NAME) + +try: + import kvcached.vmm_ops as kvcached_cpp + PageAllocator = kvcached_cpp.PageAllocator + InternalPage: Any = kvcached_cpp.InternalPage +except ImportError as e: + raise ImportError(f"Failed to import kvcached.vmm_ops. Please ensure the C++ extension is built properly. err: {e}") + logger = get_kvcached_logger() KV_TENSOR_WAIT_TIMEOUT: float = 10.0 # seconds @@ -91,13 +110,51 @@ def __init__( self.world_size, pp_rank=self.pp_rank, async_sched=async_sched, + contiguous_layout=CONTIGUOUS_LAYOUT, + enable_page_prealloc=True, num_kv_buffers=self.num_kv_buffers, group_id=self.group_id, ) + # Register should_use_worker_ipc callback so C++ PageAllocator + # knows when to use broadcast IPC even with world_size == 1 + # (e.g. vLLM V1 EngineCore + worker in separate processes). + try: + from kvcached.integration.vllm.interfaces import should_use_worker_ipc + self.page_allocator.set_should_use_worker_ipc_callback(should_use_worker_ipc) + use_worker_ipc = should_use_worker_ipc() + except ImportError: + use_worker_ipc = False + + if self.world_size > 1 or use_worker_ipc: + try: + from kvcached.tp_ipc_util import ( + broadcast_map_to_kv_tensors, + broadcast_unmap_from_kv_tensors, + ) + + # Wrap Python functions to match C++ callback signature + def map_callback(world_size: int, offsets: List[int], pp_rank: int = 0, group_id: int = 0) -> None: + """Wrapper for Python broadcast function""" + broadcast_map_to_kv_tensors(world_size, offsets, pp_rank, group_id) + + def unmap_callback(world_size: int, offsets: List[int]) -> None: + """Wrapper for Python broadcast function""" + broadcast_unmap_from_kv_tensors(world_size, offsets, pp_rank, group_id) + + # Set the callbacks in the PageAllocator + self.page_allocator.set_broadcast_map_callback(map_callback) + self.page_allocator.set_broadcast_unmap_callback(unmap_callback) + + logger.info("Set up broadcast callbacks for multi-process (world_size=%d, use_worker_ipc=%s)", + self.world_size, use_worker_ipc) + except ImportError as e: + logger.warning("Failed to import tp_ipc_util module: %s. Broadcast callbacks will not be available.", e) + except Exception as e: + logger.warning("Failed to set up broadcast callbacks: %s. Falling back to single-process mode.", e) self.num_avail_blocks = 0 # Only count free blocks in avail_pages - self.avail_pages: Dict[int, Page] = {} - self.full_pages: Dict[int, Page] = {} + self.avail_pages: Dict[int, InternalPage] = {} + self.full_pages: Dict[int, InternalPage] = {} self.reserved_blocks: List[int] = [] self.null_block: Optional[list[int]] = None @@ -182,9 +239,9 @@ def _alloc(self, # finished and then perform the usual capacity check. self._wait_post_init() - new_mem_size = self.page_allocator.mem_info_tracker.check_and_get_resize_target( - self.mem_size, self.num_layers, self.num_kv_buffers) - if new_mem_size is not None: + new_mem_size = self.page_allocator.check_and_get_resize_target( + self.mem_size) + if new_mem_size > 0: self.resize(new_mem_size) if self.available_size() < need_size: @@ -193,7 +250,7 @@ def _alloc(self, return None ret_index = [] - page: Optional[Page] = None + page: Optional[InternalPage] = None remaining_need = need_size @@ -244,11 +301,7 @@ def free(self, indices: List[int]): raise ValueError(f"Freed index {idx} is in " " reserved_blocks, which is not allowed.") - # Group indices by page_id - idx_dict = defaultdict(list) - for idx in indices: - page_id = self.page_allocator.get_page_id(idx, self.block_mem_size) - idx_dict[page_id].append(idx) + idx_dict = self.page_allocator.group_indices_by_page(indices, self.block_mem_size) pages_to_free: List[int] = [] for page_id, idxs in idx_dict.items(): @@ -350,7 +403,7 @@ def available_size(self) -> int: physical_free_pages = self.page_allocator.get_avail_physical_pages( ) + self.page_allocator.get_num_reserved_pages() free_pages = min(virtual_free_pages, physical_free_pages) - blocks_from_free_pages = free_pages * Page.get_num_blocks( + blocks_from_free_pages = free_pages * InternalPage.get_num_blocks( self.page_size, self.block_mem_size) return avail_blocks + blocks_from_free_pages @@ -424,13 +477,13 @@ def clear(self): @synchronized def _get_num_alloced_blocks(self) -> int: # Blocks from fully allocated pages - blocks_from_full_pages = len(self.full_pages) * Page.get_num_blocks( + blocks_from_full_pages = len(self.full_pages) * InternalPage.get_num_blocks( self.page_size, self.block_mem_size) # Blocks from partially allocated pages. num_avail_blocks is the number # of free blocks in the partially allocated pages so the number of # allocated blocks is the total number of blocks in the partially # allocated pages minus the number of free blocks. - blocks_from_avail_pages = len(self.avail_pages) * Page.get_num_blocks( + blocks_from_avail_pages = len(self.avail_pages) * InternalPage.get_num_blocks( self.page_size, self.block_mem_size) - self.num_avail_blocks # Blocks from reserved blocks blocks_from_reserved_blocks = len(self.reserved_blocks) diff --git a/kvcached/page_allocator.py b/kvcached/page_allocator.py deleted file mode 100644 index 830c02ea..00000000 --- a/kvcached/page_allocator.py +++ /dev/null @@ -1,592 +0,0 @@ -# SPDX-FileCopyrightText: Copyright contributors to the kvcached project -# SPDX-License-Identifier: Apache-2.0 - -import threading -from collections import deque -from typing import List, Optional, Tuple, cast - -import torch - -from kvcached.locks import ConditionLike, LockLike, NoOpCondition, NoOpLock -from kvcached.mem_info_tracker import MemInfoTracker -from kvcached.tp_ipc_util import broadcast_map_to_kv_tensors, broadcast_unmap_from_kv_tensors -from kvcached.utils import ( - CONTIGUOUS_LAYOUT, - GPU_UTILIZATION, - MAX_RESERVED_PAGES, - MIN_RESERVED_PAGES, - PAGE_PREALLOC_ENABLED, - SANITY_CHECK, - get_kvcached_logger, -) -from kvcached.vmm_ops import map_to_kv_tensors, unmap_from_kv_tensors - -logger = get_kvcached_logger() - -PREALLOC_THREAD_TIMEOUT: float = 2.0 # seconds - - -def _should_use_worker_ipc() -> bool: - try: - from kvcached.integration.vllm.interfaces import should_use_worker_ipc - return should_use_worker_ipc() - except ImportError: - return False - - -class Page: - - def __init__(self, page_id: int, page_size: int): - self.page_id = page_id - self.page_size = page_size - - self.start_block: Optional[int] = None - self.end_block: Optional[int] = None - self.num_kv_blocks: Optional[int] = None - self.free_list: List[int] = [] - - def _require_init(self) -> None: - """Raise AssertionError if the page has not been initialised. - """ - assert self.start_block is not None, "Page not initialised" - assert self.end_block is not None, "Page not initialised" - assert self.num_kv_blocks is not None, "Page not initialised" - - def init(self, block_mem_size: int) -> None: - self.start_block, self.end_block = self.get_block_range( - self.page_id, self.page_size, block_mem_size) - - self.num_kv_blocks = self.end_block - self.start_block - self.free_list = list(range(self.start_block, self.end_block)) - - def alloc(self, num_blocks: int = 1) -> List[int]: - self._require_init() - if self.full(): - raise ValueError(f"Page {self.page_id} is already full") - block_ids = self.free_list[:num_blocks] - self.free_list = self.free_list[num_blocks:] - return block_ids - - def free(self, block_id: int) -> None: - self._require_init() - if SANITY_CHECK: - self._sanity_check(block_id) - self.free_list.append(block_id) - - def free_batch(self, block_ids: List[int]) -> None: - self._require_init() - if SANITY_CHECK: - for block_id in block_ids: - self._sanity_check(block_id) - self.free_list.extend(block_ids) - - def empty(self) -> bool: - self._require_init() - return len(self.free_list) == self.num_kv_blocks - - def full(self) -> bool: - self._require_init() - return not self.free_list - - def num_free_blocks(self) -> int: - self._require_init() - return len(self.free_list) - - def get_free_blocks(self) -> List[int]: - self._require_init() - return self.free_list - - def _has_block(self, block_id: int) -> bool: - self._require_init() - return block_id >= cast(int, self.start_block) and block_id < cast( - int, self.end_block) - - def _sanity_check(self, block_id: int) -> None: - self._require_init() - if not self._has_block(block_id): - raise ValueError( - f"Page {self.page_id} does not have block {block_id}") - if block_id in self.free_list: - raise ValueError(f"Block {block_id} is already free") - - @staticmethod - def get_block_range(page_id: int, page_size: int, - block_mem_size: int) -> Tuple[int, int]: - """ - Get the block range of a page. - The page contains [start_block, end_block), which handles the case where - page_size is not divisible by block_mem_size. - For example, if page_size = 16 and block_mem_size = 6, the page 0 - contains [0, 2) blocks, and the page 1 contains [3, 5) blocks. - Pages: | 0-16 | 16-32 | - | 0-6 | 6-12 | 12-18 | 18-24 | 24-30 | 30-32 | - Blocks: | 0 | 1 |2| 3 | 4 |5| - """ - start_block = (page_id * page_size + block_mem_size - - 1) // block_mem_size - end_block = ((page_id + 1) * page_size) // block_mem_size - return start_block, end_block - - @staticmethod - def get_num_blocks(page_size: int, block_mem_size: int) -> int: - """ - Calculate the number of blocks that can fit in a page. - This calculation is accurate even when page_size is not divisible by - block_mem_size. - """ - return page_size // block_mem_size - - -class PageAllocator: - - def __init__(self, - num_layers: int, - mem_size_per_layer: int, - page_size: int, - world_size: int = 1, - pp_rank: int = 0, - async_sched: bool = False, - contiguous_layout: bool = CONTIGUOUS_LAYOUT, - enable_page_prealloc: bool = PAGE_PREALLOC_ENABLED, - num_kv_buffers: int = 2, - group_id: int = 0): - """ - Args: - num_layers: Number of layers (for physical memory calculation). - mem_size_per_layer: Memory size per layer per K/V tensor in bytes. - page_size: Page size in bytes. - world_size: Tensor parallel world size within a pipeline stage. - pp_rank: Pipeline parallel rank (for IPC socket namespacing). - async_sched: Whether asynchronous scheduling is enabled. - contiguous_layout: Whether to use contiguous layout. - enable_page_prealloc: Whether to enable page preallocation. - num_kv_buffers: Number of KV buffers per layer (2 for MHA K+V, - 1 for MLA combined KV). - group_id: KV cache group identifier for hybrid attention models. - Different groups have independent FTensors and page spaces. - """ - logger.info( - f"Init kvcached KV cache allocator: " - f"num_layers={num_layers}, " - f"mem_size_per_layer={mem_size_per_layer//(1024*1024)}MB, " - f"total_mem_size={num_kv_buffers * num_layers * mem_size_per_layer//(1024*1024)}MB, " - f"page_size={page_size//(1024*1024)}MB, " - f"world_size={world_size}, " - f"async_sched={async_sched}, " - f"contiguous_layout={contiguous_layout}, " - f"enable_prealloc={enable_page_prealloc}") - # WARNING (YIFAN): kvcached_ops.init_kvcached must have been called - # before this. - - self.num_layers = num_layers - self.mem_size_per_layer = mem_size_per_layer - self.page_size = page_size - self.world_size = world_size - self.pp_rank = pp_rank - self.async_sched = async_sched - self.contiguous_layout = contiguous_layout - self.num_kv_buffers = num_kv_buffers - self.group_id = group_id - # TODO: make this compatible with engine's memory limit after getting - # better configuration management. - self.gpu_utilization = GPU_UTILIZATION - self.num_free_pages = mem_size_per_layer // page_size - self.num_total_pages = mem_size_per_layer // page_size - - self.free_page_list: deque[int] = deque(range(self.num_free_pages)) - - self.min_reserved_pages: int = MIN_RESERVED_PAGES - self.max_reserved_pages: int = MAX_RESERVED_PAGES - self.reserved_page_list: deque[int] = deque() # Fast path allocation - - self.reclaimed_page_list: deque[int] = deque() # Reclaimed page ids - - # Initialize memory info tracker - self.mem_info_tracker = MemInfoTracker( - self.mem_size_per_layer * num_layers * num_kv_buffers, - group_id=group_id) - - # Preallocation thread management - self.enable_page_prealloc: bool = enable_page_prealloc - - self._lock: LockLike - self._cond: ConditionLike - - if self.enable_page_prealloc: - self._lock = threading.RLock() - self._cond = threading.Condition(self._lock) - else: # No preallocation lock and condition are needed. - self._lock = NoOpLock() - self._cond = NoOpCondition(self._lock) - self.prealloc_running: bool = False - self.prealloc_needed: bool = False - self.prealloc_thd: Optional[threading.Thread] = None - - def __del__(self): - try: - if self.enable_page_prealloc and self.prealloc_thd is not None: - self._stop_prealloc_thread(timeout=PREALLOC_THREAD_TIMEOUT) - except Exception: - # Silently ignore exceptions during cleanup - pass - - def start_prealloc_thread(self): - # NOTE: called by KVCacheManager after reserving the null block - if self.enable_page_prealloc: - self._lock = threading.RLock() - self._cond = threading.Condition(self._lock) - self._start_prealloc_thread() - - def alloc_page(self) -> Page: - with self._lock: - page_id: Optional[int] = None - - while page_id is None: - # Fast path: allocate pages with reserved physical memory mapping. - if self.reserved_page_list: - page_id = self.reserved_page_list.popleft() - self.num_free_pages -= 1 - - # Trigger preallocation to refill reserved pool if getting low - if len(self.reserved_page_list) < self.min_reserved_pages: - self.prealloc_needed = True - self._cond.notify_all() - - # Update memory usage after fast path allocation - self._update_memory_usage() - return Page(page_id, self.page_size) - - # Slow path: allocate pages with new physical memory mapping. - if self.free_page_list: - page_id = self.free_page_list.popleft() - self.num_free_pages -= 1 - break - - if self.num_free_pages <= 0: - raise ValueError("No free pages left") - - if not self.enable_page_prealloc: - raise RuntimeError( - "Inconsistent page allocator state: no free pages " - "available to allocate") - - # Wait for background preallocation or page freeing to finish. - self._cond.wait() - - assert page_id is not None - - try: - self._map_pages([page_id]) - except Exception as e: - # If mapping fails, return page to free list and restore count - with self._lock: - self.free_page_list.appendleft(page_id) - self.num_free_pages += 1 - self._cond.notify_all() - raise RuntimeError(f"Failed to map page {page_id}: {e}") from e - - if self.enable_page_prealloc: - # Trigger preallocation to refill the pool - self._trigger_preallocation() - - # Update memory usage after mapping pages - self._update_memory_usage() - return Page(page_id, self.page_size) - - def free_page(self, page_id: int) -> None: - with self._lock: - if SANITY_CHECK and (page_id in self.free_page_list - or page_id in self.reserved_page_list): - raise ValueError(f"Page {page_id} is already free or reserved") - - self.num_free_pages += 1 - if len(self.reserved_page_list) < self.max_reserved_pages: - # Fast path: reserve page with its physical memory mapping. - self.reserved_page_list.append(page_id) - # Update memory usage after fast path free/reserve - self._update_memory_usage() - self._cond.notify_all() - return - - # Slow path: free page and its physical memory mapping. - self._unmap_pages([page_id]) - with self._lock: - self.free_page_list.append(page_id) - # Update memory usage after unmapping pages - self._update_memory_usage() - self._cond.notify_all() - - def free_pages(self, page_ids: List[int]) -> None: - with self._lock: - if SANITY_CHECK: - for page_id in page_ids: - if (page_id in self.free_page_list - or page_id in self.reserved_page_list): - raise ValueError( - f"Page {page_id} is already free or reserved") - - self.num_free_pages += len(page_ids) - num_to_reserve = self.max_reserved_pages - len( - self.reserved_page_list) - if num_to_reserve > 0: - # Fast path: reserve pages with their physical memory mapping. - self.reserved_page_list.extend(page_ids[:num_to_reserve]) - self._cond.notify_all() - page_ids = page_ids[num_to_reserve:] - - if len(page_ids) == 0: - # Update memory usage after fast path free/reserve - self._update_memory_usage() - return - - # Slow path: free page_ids and their physical memory mapping. - self._unmap_pages(page_ids) - with self._lock: - self.free_page_list.extend(page_ids) - # Update memory usage after unmapping pages - self._update_memory_usage() - self._cond.notify_all() - - def resize(self, new_mem_size: int) -> bool: - new_num_pages = new_mem_size // self.page_size - with self._lock: - if new_num_pages < self.get_num_inuse_pages(): - return False - if new_num_pages == self.num_total_pages: - return True - elif new_num_pages > self.num_total_pages: - num_to_expand = new_num_pages - self.num_total_pages - - # Reuse previously reclaimed pages first. - num_to_reuse = min(len(self.reclaimed_page_list), - num_to_expand) - if num_to_reuse > 0: - for _ in range(num_to_reuse): - self.free_page_list.append( - self.reclaimed_page_list.popleft()) - num_to_expand -= num_to_reuse - self.num_free_pages += num_to_reuse - - # Allocate new pages if needed. - if num_to_expand > 0: - new_page_ids = list( - range(self.num_total_pages, - self.num_total_pages + num_to_expand)) - self.free_page_list.extend(new_page_ids) - self.num_free_pages += num_to_expand - self.num_total_pages = new_num_pages - self._update_memory_usage() - else: # new_num_pages < self.num_total_pages and new_num_pages >= num_inuse_pages - num_to_reclaim = self.num_total_pages - new_num_pages - - if len(self.free_page_list) < num_to_reclaim: - # Need to trim reserved pages first - reserved_count = len(self.reserved_page_list) - if reserved_count > 0: - # Move reserved pages back to free list - pages_to_unmap = list(self.reserved_page_list) - self.reserved_page_list.clear() - # Unmap outside the lock to avoid holding it during I/O - try: - self._lock.release() - self._unmap_pages(pages_to_unmap) - finally: - self._lock.acquire() - self.free_page_list.extend(pages_to_unmap) - # Update memory usage after unmapping pages - self._update_memory_usage() - - if len(self.free_page_list) < num_to_reclaim: - # Still not enough free pages - return False - - for _ in range(num_to_reclaim): - self.reclaimed_page_list.append(self.free_page_list.pop()) - self.num_free_pages -= num_to_reclaim - self.num_total_pages = new_num_pages - return True - - def trim(self) -> None: - """ - Trim the reserved pages to free up physical memory. - """ - with self._lock: - pages_to_unmap = list(self.reserved_page_list) # copy - self.reserved_page_list.clear() - - if not pages_to_unmap: - # Update memory usage after trimming - self._update_memory_usage() - return - - try: - self._lock.release() - self._unmap_pages(pages_to_unmap) - finally: - self._lock.acquire() - - self.free_page_list.extend(pages_to_unmap) - # Update memory usage after unmapping pages - self._update_memory_usage() - - def reset_free_page_order(self) -> None: - """Reset the free page list to sorted order. - - After free_pages + trim cycles, freed pages are appended to the - end of the deque so the ordering becomes scrambled. This method - re-sorts the free list so that low-numbered pages (starting with - page 0) are allocated first — important because SGLang expects - the very first block (on page 0) to be reserved as the null block. - """ - with self._lock: - sorted_pages = sorted(self.free_page_list) - self.free_page_list = deque(sorted_pages) - - def get_num_free_pages(self) -> int: - return self.num_free_pages - - def get_num_inuse_pages(self) -> int: - return self.num_total_pages - self.num_free_pages - - def get_num_total_pages(self) -> int: - return self.num_total_pages - - def get_num_reserved_pages(self) -> int: - with self._lock: - return len(self.reserved_page_list) - - def get_avail_physical_pages(self) -> int: - avail_phy_mem_size, total_phy_mem_size = torch.cuda.mem_get_info() - headroom = int(total_phy_mem_size * (1 - self.gpu_utilization)) - avail_phy_mem_size = max(avail_phy_mem_size - headroom, 0) - - # Calculate available pages considering layers and KV buffers - avail_phy_pages = avail_phy_mem_size // self.page_size - # Each layer needs num_kv_buffers pages (2 for MHA K+V, 1 for MLA). - avail_pages_per_layer = avail_phy_pages // self.num_layers // self.num_kv_buffers - return int(avail_pages_per_layer) - - def get_page_id(self, block_id: int, block_mem_size: int) -> int: - return block_id * block_mem_size // self.page_size - - # Private methods - def _prealloc_worker(self): - """Worker thread that preallocates and maps physical pages.""" - while self.prealloc_running: - with self._lock: - # Wait until preallocation is needed or thread is stopped - while not self.prealloc_needed and self.prealloc_running: - self._cond.wait() - - if not self.prealloc_running: - break - - self.prealloc_needed = False - current_reserved = len(self.reserved_page_list) - to_reserve = max(0, self.min_reserved_pages - current_reserved) - # Only try to reserve up to the available free pages - to_reserve = min(to_reserve, len(self.free_page_list), - self.get_avail_physical_pages()) - if to_reserve <= 0: - continue - - pages_to_reserve = [] - - # Get pages from free list - for _ in range(to_reserve): - if self.free_page_list: - pages_to_reserve.append(self.free_page_list.popleft()) - else: - break - - if pages_to_reserve: - try: - self._map_pages(pages_to_reserve) - with self._lock: - self.reserved_page_list.extend(pages_to_reserve) - # Update memory usage after mapping pages - self._update_memory_usage() - self._cond.notify_all() - logger.debug( - f"Preallocated {len(pages_to_reserve)} pages, " - f"reserved={len(self.reserved_page_list)}") - except Exception as e: - # If mapping fails, return pages to free list - with self._lock: - self.free_page_list.extendleft(pages_to_reserve) - self._cond.notify_all() - logger.error( - f"Failed to preallocate {len(pages_to_reserve)} pages: " - f"{e}") - - def _start_prealloc_thread(self): - if self.prealloc_thd is None: - self.prealloc_running = True - self.prealloc_thd = threading.Thread(target=self._prealloc_worker, - daemon=True) - self.prealloc_thd.start() - - # Initial preallocation trigger - self._trigger_preallocation() - - def _stop_prealloc_thread(self, timeout: Optional[float] = None): - if self.prealloc_thd is not None: - with self._lock: - self.prealloc_running = False - self._cond.notify_all() - self.prealloc_thd.join(timeout) - if self.prealloc_thd.is_alive(): - logger.warning( - "Preallocation thread did not stop within timeout") - self.prealloc_thd = None - logger.debug("Stopped page preallocation thread") - - def _trigger_preallocation(self): - """Trigger the preallocation thread to fill up reserved blocks""" - with self._lock: - self.prealloc_needed = True - self._cond.notify_all() - - def _map_pages(self, page_ids: list[int]) -> None: - if self.contiguous_layout: - offsets = [ - pid * self.page_size * self.num_layers * self.num_kv_buffers - for pid in page_ids - ] - else: - offsets = [pid * self.page_size for pid in page_ids] - if self.world_size > 1 or _should_use_worker_ipc(): - broadcast_map_to_kv_tensors(self.world_size, offsets, self.pp_rank, - group_id=self.group_id) - else: - map_to_kv_tensors(offsets, group_id=self.group_id) - - def _unmap_pages(self, page_ids: list[int]) -> None: - if self.contiguous_layout: - offsets = [ - pid * self.page_size * self.num_layers * self.num_kv_buffers - for pid in page_ids - ] - else: - offsets = [pid * self.page_size for pid in page_ids] - if self.world_size > 1 or _should_use_worker_ipc(): - broadcast_unmap_from_kv_tensors(self.world_size, offsets, - self.pp_rank, - group_id=self.group_id) - else: - if self.async_sched: - torch.cuda.synchronize() - unmap_from_kv_tensors(offsets, group_id=self.group_id) - - def _update_memory_usage(self): - """Update memory usage information in shared memory.""" - # Memory actively used by allocations (excludes preallocated pages). - used_phy_mem_size = (self.get_num_inuse_pages() * self.num_layers * - self.page_size * self.num_kv_buffers) - # Memory held by preallocated pages that are not yet actively used. - prealloc_phy_mem_size = (self.get_num_reserved_pages() * - self.num_layers * self.page_size * - self.num_kv_buffers) - - self.mem_info_tracker.update_memory_usage( - used_size=used_phy_mem_size, prealloc_size=prealloc_phy_mem_size) diff --git a/setup.py b/setup.py index 23a75981..3ea4ce83 100644 --- a/setup.py +++ b/setup.py @@ -31,8 +31,10 @@ def get_csrc_files(path) -> List[str]: src_dir = Path(path) # setuptools requires relative paths + # Filter out macOS AppleDouble metadata files (._* prefix) cpp_files = [ str(f.relative_to(SCRIPT_PATH)) for f in src_dir.rglob("*.cpp") + if not f.name.startswith("._") ] return cpp_files diff --git a/tests/test_kvcache_manager.py b/tests/test_kvcache_manager.py index 575e4930..f7162dbe 100644 --- a/tests/test_kvcache_manager.py +++ b/tests/test_kvcache_manager.py @@ -72,15 +72,13 @@ def setup_kvcache(): ) # wait a bit for pre-allocation to finish - if manager.page_allocator.enable_page_prealloc: - start_time = time.time() - timeout = 5.0 # seconds - min_pages = manager.page_allocator.min_reserved_pages - while len(manager.page_allocator.reserved_page_list) < min_pages: - if time.time() - start_time > timeout: - # This is not a hard failure, but test_trim might become flaky. - break - time.sleep(0.1) + start_time = time.time() + timeout = 5.0 # seconds + while manager.page_allocator.get_num_reserved_pages() == 0: + if time.time() - start_time > timeout: + # This is not a hard failure, but test_trim might become flaky. + break + time.sleep(0.1) yield manager @@ -124,7 +122,7 @@ def test_resize_smaller_and_larger(setup_kvcache): # - mem_size: # used by resize method, corresponds K (or V) tensor size in 1 layer, typically in few GBs manager = setup_kvcache - initial_total_pages = manager.page_allocator.num_total_pages + initial_total_pages = manager.page_allocator.get_num_total_pages() initial_attribute_mem_size = manager.mem_size meminfo = get_kv_cache_limit(IPC_NAME) assert meminfo is not None @@ -137,11 +135,11 @@ def test_resize_smaller_and_larger(setup_kvcache): # update the shm total_size field update_kv_cache_limit(IPC_NAME, shrink_kv_cache_limit) # infer the new mem_size based on shm total_size --- workflow in kvcached - shrink_shm_mem_size = manager.page_allocator.mem_info_tracker.check_and_get_resize_target( - manager.mem_size, manager.num_layers) + shrink_shm_mem_size = manager.page_allocator.check_and_get_resize_target( + manager.mem_size) # actual resize method manager.resize(shrink_shm_mem_size) - shrink_total_pages = manager.page_allocator.num_total_pages + shrink_total_pages = manager.page_allocator.get_num_total_pages() assert initial_total_pages == shrink_total_pages + initial_total_pages // 2 # RESIZE LARGER: add back the deducted half of initial total pages @@ -149,11 +147,11 @@ def test_resize_smaller_and_larger(setup_kvcache): # update the shm total_size field update_kv_cache_limit(IPC_NAME, expand_kv_cache_limit) # infer the new mem_size based on shm total_size --- workflow in kvcached - expand_shm_mem_size = manager.page_allocator.mem_info_tracker.check_and_get_resize_target( - shrink_shm_mem_size, manager.num_layers) + expand_shm_mem_size = manager.page_allocator.check_and_get_resize_target( + shrink_shm_mem_size) # actual resize method manager.resize(expand_shm_mem_size) - expand_total_pages = manager.page_allocator.num_total_pages + expand_total_pages = manager.page_allocator.get_num_total_pages() assert expand_total_pages == initial_total_pages @@ -161,14 +159,13 @@ def test_trim(setup_kvcache): # instantiate a kv cache manager with known size manager = setup_kvcache - # initial reserved pages - initial_reserved = len(manager.page_allocator.reserved_page_list) - if manager.page_allocator.enable_page_prealloc: - assert initial_reserved == manager.page_allocator.min_reserved_pages + # initial reserved pages (assumes prealloc is enabled, which is the default) + initial_reserved = manager.page_allocator.get_num_reserved_pages() + assert initial_reserved > 0 # trim reserved pages manager.trim() - after_trim_reserved = len(manager.page_allocator.reserved_page_list) + after_trim_reserved = manager.page_allocator.get_num_reserved_pages() assert after_trim_reserved == 0