Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions nanovllm/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,22 @@
from transformers import AutoConfig


@dataclass
class KVTransferConfig:
"""KV Cache 传输配置(CPU Offload 等场景)。

Attributes:
kv_connector: 连接器类名,如 "CPUOffloadConnector"
kv_role: 连接器角色,"kv_both" 表示同时承担 send/recv
swap_in_threshold: swap in 触发阈值
cpu_swap_space_gb: CPU 端交换空间大小(GB)
"""
kv_connector: str = ""
kv_role: str = "kv_both"
swap_in_threshold: int = 0
cpu_swap_space_gb: float = 100.0


@dataclass
class Config:
model: str
Expand All @@ -16,6 +32,7 @@ class Config:
eos: int = -1
kvcache_block_size: int = 256
num_kvcache_blocks: int = -1
kv_transfer_config: KVTransferConfig | None = None

def __post_init__(self):
assert os.path.isdir(self.model)
Expand Down
1 change: 1 addition & 0 deletions nanovllm/distributed/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from nanovllm.distributed.cpu_offload_connector import CPUOffloadConnector
163 changes: 163 additions & 0 deletions nanovllm/distributed/cpu_offload_connector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
"""CPU Offload Connector 主模块。

实现 GPU/NPU KV Cache 到 CPU(Host DRAM)的卸载与加载连接器。
在 Connector 内部启动 metadata server,管理 CPU KV Cache 并提供
RPC 调用接口,支持多个 data-parallel EngineCore 实例之间共享 KV Cache。
对应 vllm-ascend PR #1659 中的 cpu_offload_connector.py。
"""

import torch

from nanovllm.distributed.cpu_offload_manager.cpu_kv_cache_manager import CPUKVCacheManager
from nanovllm.distributed.cpu_offload_manager.metadata import (
OffloadMetadata,
SwapRequest,
build_offload_metadata,
)


class CPUOffloadConnector:
"""KV Cache CPU 卸载连接器。

负责协调 GPU 端 Attention 层与 CPU 端 KV Cache Manager 之间的数据传输。
支持在 prefill 完成后将 KV Cache 卸载到 CPU,以及在需要时从 CPU 换回 GPU。

Args:
num_layers: 模型 Transformer 层数
block_size: 每个 KV block 的 token 数
num_kv_heads: KV head 数量
head_dim: 每个 head 的维度
dtype: 数据类型
swap_in_threshold: swap in 触发阈值(GPU 空闲 block 低于此值时触发)
cpu_swap_space_gb: CPU 端预分配的交换空间(GB)
"""

def __init__(
self,
num_layers: int,
block_size: int,
num_kv_heads: int,
head_dim: int,
dtype: torch.dtype = torch.float16,
swap_in_threshold: int = 0,
cpu_swap_space_gb: float = 100.0,
):
self.num_layers = num_layers
self.block_size = block_size
self.num_kv_heads = num_kv_heads
self.head_dim = head_dim
self.dtype = dtype
self.swap_in_threshold = swap_in_threshold
self.cpu_swap_space_gb = cpu_swap_space_gb

self.manager: CPUKVCacheManager | None = None # 延迟初始化
self.pending_swap_out: list[SwapRequest] = [] # 待执行的 swap out 请求队列
self.pending_swap_in: list[SwapRequest] = [] # 待执行的 swap in 请求队列
self._is_initialized: bool = False

def initialize(self) -> None:
"""初始化连接器:创建 CPU KV Cache Manager 并启动 metadata server。

Returns:
None
"""
# --- 执行逻辑 ---
# 1. 创建 CPUKVCacheManager 实例,传入配置参数
# 2. 调用 manager.init_pool() 在 CPU 内存上预分配 KV Cache 存储池
# 3. 启动 metadata server(RPC 服务),监听来自其他 DP 实例的请求
# - 绑定端口,注册 swap_out / swap_in / query 等 RPC 方法
# - 使 metadata 和 CPU KV Cache 可被多个 DP EngineCore 共享
# 4. 设置 _is_initialized = True
pass

def send_kv_caches_and_hidden_states(
self,
layer_name: str,
seq_id: int,
gpu_k_cache: torch.Tensor,
gpu_v_cache: torch.Tensor,
block_ids: list[int],
token_length: int,
is_prefill: bool,
) -> bool:
"""在 prefill 完成后将指定层的 KV Cache 卸载到 CPU(swap out)。

Args:
layer_name: 模型层名称,如 "layers.0.self_attn"
seq_id: 序列 ID
gpu_k_cache: GPU 端该层的 K cache 张量
gpu_v_cache: GPU 端该层的 V cache 张量
block_ids: 需要卸载的 GPU block ID 列表
token_length: 序列当前 token 长度
is_prefill: 是否处于 prefill 阶段

Returns:
bool: 卸载是否成功
"""
# --- 执行逻辑 ---
# 1. 构建 OffloadMetadata(调用 build_offload_metadata)
# 2. 创建 SwapRequest,direction="swap_out"
# 3. 调用 manager.swap_out(gpu_k_cache, gpu_v_cache, metadata)
# 将 GPU 上的 KV Cache 异步拷贝到 CPU pin_memory 区域
# 4. 返回操作是否成功
return False

def recv_kv_caches_and_hidden_states(
self,
layer_name: str,
seq_id: int,
gpu_k_cache: torch.Tensor,
gpu_v_cache: torch.Tensor,
block_ids: list[int],
token_length: int,
is_prefill: bool,
) -> bool:
"""在需要时将 KV Cache 从 CPU 加载回 GPU(swap in)。

Args:
layer_name: 模型层名称
seq_id: 序列 ID
gpu_k_cache: GPU 端该层的 K cache 张量(写入目标)
gpu_v_cache: GPU 端该层的 V cache 张量(写入目标)
block_ids: 需要加载的 GPU block ID 列表
token_length: 序列当前 token 长度
is_prefill: 是否处于 prefill 阶段

Returns:
bool: 加载是否成功
"""
# --- 执行逻辑 ---
# 1. 构建 OffloadMetadata
# 2. 检查 manager 中是否缓存了对应的 block
# 若未缓存,跳过(可能该 block 尚未被 swap out 过)
# 3. 调用 manager.swap_in(gpu_k_cache, gpu_v_cache, metadata)
# 将 CPU 上的 KV Cache 拷贝回 GPU
# 4. 返回操作是否成功
return False

def should_swap_in(self, num_free_gpu_blocks: int) -> bool:
"""判断是否应触发 swap in 操作。

Args:
num_free_gpu_blocks: GPU 端当前空闲的 block 数量

Returns:
bool: 是否需要从 CPU 换入 KV Cache 到 GPU
"""
# --- 执行逻辑 ---
# 若 GPU 空闲 block 数低于 swap_in_threshold,返回 True
# 否则返回 False
return False

def close(self) -> None:
"""关闭连接器,释放 CPU 资源并停止 metadata server。

Returns:
None
"""
# --- 执行逻辑 ---
# 1. 停止 metadata server(RPC 服务),关闭端口
# 2. 释放 manager 中的 CPU KV Cache 内存池
# 3. 清空 pending_swap_out 和 pending_swap_in 队列
# 4. 设置 _is_initialized = False
pass
2 changes: 2 additions & 0 deletions nanovllm/distributed/cpu_offload_manager/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from nanovllm.distributed.cpu_offload_manager.cpu_kv_cache_manager import CPUKVCacheManager
from nanovllm.distributed.cpu_offload_manager.metadata import OffloadMetadata
176 changes: 176 additions & 0 deletions nanovllm/distributed/cpu_offload_manager/cpu_kv_cache_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
"""CPU 端 KV Cache 管理器。

负责在 Host DRAM 上分配、存储和检索被卸载的 KV Cache 数据,
并通过 RPC 接口供多个 data-parallel EngineCore 实例共享访问。
对应 vllm-ascend PR #1659 中的 cpu_kv_cache_manager.py。
"""

import torch

from nanovllm.distributed.cpu_offload_manager.metadata import OffloadMetadata


class CPUKVCacheManager:
"""管理 CPU(Host DRAM)上的 KV Cache 存储池。

该管理器在 CPU 内存中维护一块预分配的 KV Cache 空间,
支持按 (seq_id, layer_name, block_id) 三元组进行存取。

Args:
cpu_swap_space_gb: CPU 端预分配的交换空间大小(GB)
num_layers: 模型的 Transformer 层数
block_size: 每个 KV block 包含的 token 数
num_kv_heads: KV head 数量
head_dim: 每个 head 的维度
dtype: 数据类型
"""

def __init__(
self,
cpu_swap_space_gb: float,
num_layers: int,
block_size: int,
num_kv_heads: int,
head_dim: int,
dtype: torch.dtype = torch.float16,
):
self.cpu_swap_space_gb = cpu_swap_space_gb
self.num_layers = num_layers
self.block_size = block_size
self.num_kv_heads = num_kv_heads
self.head_dim = head_dim
self.dtype = dtype

# --- 以下属性在 init_pool 中实际分配 ---
self.cpu_k_cache: torch.Tensor | None = None # CPU 端 K cache 存储
self.cpu_v_cache: torch.Tensor | None = None # CPU 端 V cache 存储
self.num_cpu_blocks: int = 0 # CPU 端总 block 数
self.free_cpu_block_ids: list[int] = [] # 空闲 CPU block 列表
self.block_mapping: dict[tuple, int] = {} # (seq_id, layer, gpu_block) → cpu_block 映射

def init_pool(self) -> None:
"""在 CPU 内存中预分配 KV Cache 存储池。

Returns:
None
"""
# --- 执行逻辑 ---
# 1. 根据 cpu_swap_space_gb 计算可容纳的 CPU block 总数:
# block_bytes = 2 * block_size * num_kv_heads * head_dim * dtype.itemsize
# num_cpu_blocks = cpu_swap_space_gb * 1024^3 / (num_layers * block_bytes)
# 2. 分配 cpu_k_cache: shape = [num_layers, num_cpu_blocks, block_size, num_kv_heads, head_dim]
# 分配 cpu_v_cache: 同上,均使用 pin_memory 以加速 GPU↔CPU 传输
# 3. 初始化 free_cpu_block_ids = list(range(num_cpu_blocks))
pass

def allocate_cpu_block(self) -> int:
"""从空闲池中分配一个 CPU block。

Returns:
int: 分配到的 CPU block ID,若无可用 block 则返回 -1
"""
# --- 执行逻辑 ---
# 1. 检查 free_cpu_block_ids 是否非空
# 2. 若非空,pop 出一个 block_id 并返回
# 3. 若为空,返回 -1 表示 CPU 空间不足
return -1

def free_cpu_block(self, cpu_block_id: int) -> None:
"""释放一个 CPU block,归还到空闲池。

Args:
cpu_block_id: 要释放的 CPU block ID

Returns:
None
"""
# --- 执行逻辑 ---
# 1. 将 cpu_block_id 追加到 free_cpu_block_ids
# 2. 从 block_mapping 中删除对应的映射条目
pass

def swap_out(
self,
gpu_k_cache: torch.Tensor,
gpu_v_cache: torch.Tensor,
metadata: OffloadMetadata,
) -> bool:
"""将指定层的 KV Cache 从 GPU 卸载到 CPU(swap out)。

Args:
gpu_k_cache: GPU 端该层的 K cache,shape = [num_blocks, block_size, num_kv_heads, head_dim]
gpu_v_cache: GPU 端该层的 V cache,shape 同上
metadata: 描述本次卸载的元数据(包含 seq_id, layer_name, block_ids 等)

Returns:
bool: 卸载是否成功(CPU 空间不足时返回 False)
"""
# --- 执行逻辑 ---
# 1. 解析 metadata 获取 layer_index 和 block_ids
# 2. 对每个 gpu_block_id in block_ids:
# a. 调用 allocate_cpu_block() 获取 cpu_block_id
# b. 若分配失败,回滚已分配的 block 并返回 False
# c. 用非阻塞拷贝将 gpu_k_cache[gpu_block_id] → cpu_k_cache[layer][cpu_block_id]
# d. 同理拷贝 V cache
# e. 记录 block_mapping[(seq_id, layer, gpu_block_id)] = cpu_block_id
# 3. 同步 CUDA stream 确保拷贝完成
# 4. 返回 True
return False

def swap_in(
self,
gpu_k_cache: torch.Tensor,
gpu_v_cache: torch.Tensor,
metadata: OffloadMetadata,
) -> bool:
"""将指定层的 KV Cache 从 CPU 加载回 GPU(swap in)。

Args:
gpu_k_cache: GPU 端该层的 K cache(目标写入位置)
gpu_v_cache: GPU 端该层的 V cache(目标写入位置)
metadata: 描述本次加载的元数据

Returns:
bool: 加载是否成功(找不到对应 CPU block 时返回 False)
"""
# --- 执行逻辑 ---
# 1. 解析 metadata 获取 layer_index, seq_id, block_ids
# 2. 对每个 gpu_block_id in block_ids:
# a. 通过 block_mapping 查找对应的 cpu_block_id
# b. 若找不到,说明数据不在 CPU 上,返回 False
# c. 用非阻塞拷贝将 cpu_k_cache[layer][cpu_block_id] → gpu_k_cache[gpu_block_id]
# d. 同理拷贝 V cache
# e. 释放已加载的 cpu_block(调用 free_cpu_block)
# 3. 同步 CUDA stream 确保拷贝完成
# 4. 返回 True
return False

def has_cached(self, seq_id: int, layer_name: str, block_id: int) -> bool:
"""查询某个 block 是否已被缓存在 CPU 端。

Args:
seq_id: 序列 ID
layer_name: 层名称
block_id: GPU 端 block ID

Returns:
bool: 是否存在于 CPU cache 中
"""
# --- 执行逻辑 ---
# 检查 (seq_id, layer_name, block_id) 是否在 block_mapping 中
return False

def clear_seq(self, seq_id: int) -> None:
"""清除某个序列在 CPU 端的所有缓存 block。

Args:
seq_id: 要清除的序列 ID

Returns:
None
"""
# --- 执行逻辑 ---
# 1. 遍历 block_mapping,找出所有 key 中 seq_id 匹配的条目
# 2. 对每个匹配条目调用 free_cpu_block 释放
# 3. 从 block_mapping 中删除这些条目
pass
Loading