diff --git a/nanovllm/config.py b/nanovllm/config.py index 959ffb357..329ca9eb0 100644 --- a/nanovllm/config.py +++ b/nanovllm/config.py @@ -3,6 +3,22 @@ from transformers import AutoConfig +@dataclass +class KVTransferConfig: + """KV Cache 传输配置(CPU Offload 等场景)。 + + Attributes: + kv_connector: 连接器类名,如 "CPUOffloadConnector" + kv_role: 连接器角色,"kv_both" 表示同时承担 send/recv + swap_in_threshold: swap in 触发阈值 + cpu_swap_space_gb: CPU 端交换空间大小(GB) + """ + kv_connector: str = "" + kv_role: str = "kv_both" + swap_in_threshold: int = 0 + cpu_swap_space_gb: float = 100.0 + + @dataclass class Config: model: str @@ -16,6 +32,7 @@ class Config: eos: int = -1 kvcache_block_size: int = 256 num_kvcache_blocks: int = -1 + kv_transfer_config: KVTransferConfig | None = None def __post_init__(self): assert os.path.isdir(self.model) diff --git a/nanovllm/distributed/__init__.py b/nanovllm/distributed/__init__.py new file mode 100644 index 000000000..b53738650 --- /dev/null +++ b/nanovllm/distributed/__init__.py @@ -0,0 +1 @@ +from nanovllm.distributed.cpu_offload_connector import CPUOffloadConnector diff --git a/nanovllm/distributed/cpu_offload_connector.py b/nanovllm/distributed/cpu_offload_connector.py new file mode 100644 index 000000000..ca16e8129 --- /dev/null +++ b/nanovllm/distributed/cpu_offload_connector.py @@ -0,0 +1,163 @@ +"""CPU Offload Connector 主模块。 + +实现 GPU/NPU KV Cache 到 CPU(Host DRAM)的卸载与加载连接器。 +在 Connector 内部启动 metadata server,管理 CPU KV Cache 并提供 +RPC 调用接口,支持多个 data-parallel EngineCore 实例之间共享 KV Cache。 +对应 vllm-ascend PR #1659 中的 cpu_offload_connector.py。 +""" + +import torch + +from nanovllm.distributed.cpu_offload_manager.cpu_kv_cache_manager import CPUKVCacheManager +from nanovllm.distributed.cpu_offload_manager.metadata import ( + OffloadMetadata, + SwapRequest, + build_offload_metadata, +) + + +class CPUOffloadConnector: + """KV Cache CPU 卸载连接器。 + + 负责协调 GPU 端 Attention 层与 CPU 端 KV Cache Manager 之间的数据传输。 + 支持在 prefill 完成后将 KV Cache 卸载到 CPU,以及在需要时从 CPU 换回 GPU。 + + Args: + num_layers: 模型 Transformer 层数 + block_size: 每个 KV block 的 token 数 + num_kv_heads: KV head 数量 + head_dim: 每个 head 的维度 + dtype: 数据类型 + swap_in_threshold: swap in 触发阈值(GPU 空闲 block 低于此值时触发) + cpu_swap_space_gb: CPU 端预分配的交换空间(GB) + """ + + def __init__( + self, + num_layers: int, + block_size: int, + num_kv_heads: int, + head_dim: int, + dtype: torch.dtype = torch.float16, + swap_in_threshold: int = 0, + cpu_swap_space_gb: float = 100.0, + ): + self.num_layers = num_layers + self.block_size = block_size + self.num_kv_heads = num_kv_heads + self.head_dim = head_dim + self.dtype = dtype + self.swap_in_threshold = swap_in_threshold + self.cpu_swap_space_gb = cpu_swap_space_gb + + self.manager: CPUKVCacheManager | None = None # 延迟初始化 + self.pending_swap_out: list[SwapRequest] = [] # 待执行的 swap out 请求队列 + self.pending_swap_in: list[SwapRequest] = [] # 待执行的 swap in 请求队列 + self._is_initialized: bool = False + + def initialize(self) -> None: + """初始化连接器:创建 CPU KV Cache Manager 并启动 metadata server。 + + Returns: + None + """ + # --- 执行逻辑 --- + # 1. 创建 CPUKVCacheManager 实例,传入配置参数 + # 2. 调用 manager.init_pool() 在 CPU 内存上预分配 KV Cache 存储池 + # 3. 启动 metadata server(RPC 服务),监听来自其他 DP 实例的请求 + # - 绑定端口,注册 swap_out / swap_in / query 等 RPC 方法 + # - 使 metadata 和 CPU KV Cache 可被多个 DP EngineCore 共享 + # 4. 设置 _is_initialized = True + pass + + def send_kv_caches_and_hidden_states( + self, + layer_name: str, + seq_id: int, + gpu_k_cache: torch.Tensor, + gpu_v_cache: torch.Tensor, + block_ids: list[int], + token_length: int, + is_prefill: bool, + ) -> bool: + """在 prefill 完成后将指定层的 KV Cache 卸载到 CPU(swap out)。 + + Args: + layer_name: 模型层名称,如 "layers.0.self_attn" + seq_id: 序列 ID + gpu_k_cache: GPU 端该层的 K cache 张量 + gpu_v_cache: GPU 端该层的 V cache 张量 + block_ids: 需要卸载的 GPU block ID 列表 + token_length: 序列当前 token 长度 + is_prefill: 是否处于 prefill 阶段 + + Returns: + bool: 卸载是否成功 + """ + # --- 执行逻辑 --- + # 1. 构建 OffloadMetadata(调用 build_offload_metadata) + # 2. 创建 SwapRequest,direction="swap_out" + # 3. 调用 manager.swap_out(gpu_k_cache, gpu_v_cache, metadata) + # 将 GPU 上的 KV Cache 异步拷贝到 CPU pin_memory 区域 + # 4. 返回操作是否成功 + return False + + def recv_kv_caches_and_hidden_states( + self, + layer_name: str, + seq_id: int, + gpu_k_cache: torch.Tensor, + gpu_v_cache: torch.Tensor, + block_ids: list[int], + token_length: int, + is_prefill: bool, + ) -> bool: + """在需要时将 KV Cache 从 CPU 加载回 GPU(swap in)。 + + Args: + layer_name: 模型层名称 + seq_id: 序列 ID + gpu_k_cache: GPU 端该层的 K cache 张量(写入目标) + gpu_v_cache: GPU 端该层的 V cache 张量(写入目标) + block_ids: 需要加载的 GPU block ID 列表 + token_length: 序列当前 token 长度 + is_prefill: 是否处于 prefill 阶段 + + Returns: + bool: 加载是否成功 + """ + # --- 执行逻辑 --- + # 1. 构建 OffloadMetadata + # 2. 检查 manager 中是否缓存了对应的 block + # 若未缓存,跳过(可能该 block 尚未被 swap out 过) + # 3. 调用 manager.swap_in(gpu_k_cache, gpu_v_cache, metadata) + # 将 CPU 上的 KV Cache 拷贝回 GPU + # 4. 返回操作是否成功 + return False + + def should_swap_in(self, num_free_gpu_blocks: int) -> bool: + """判断是否应触发 swap in 操作。 + + Args: + num_free_gpu_blocks: GPU 端当前空闲的 block 数量 + + Returns: + bool: 是否需要从 CPU 换入 KV Cache 到 GPU + """ + # --- 执行逻辑 --- + # 若 GPU 空闲 block 数低于 swap_in_threshold,返回 True + # 否则返回 False + return False + + def close(self) -> None: + """关闭连接器,释放 CPU 资源并停止 metadata server。 + + Returns: + None + """ + # --- 执行逻辑 --- + # 1. 停止 metadata server(RPC 服务),关闭端口 + # 2. 释放 manager 中的 CPU KV Cache 内存池 + # 3. 清空 pending_swap_out 和 pending_swap_in 队列 + # 4. 设置 _is_initialized = False + pass diff --git a/nanovllm/distributed/cpu_offload_manager/__init__.py b/nanovllm/distributed/cpu_offload_manager/__init__.py new file mode 100644 index 000000000..f5f64af56 --- /dev/null +++ b/nanovllm/distributed/cpu_offload_manager/__init__.py @@ -0,0 +1,2 @@ +from nanovllm.distributed.cpu_offload_manager.cpu_kv_cache_manager import CPUKVCacheManager +from nanovllm.distributed.cpu_offload_manager.metadata import OffloadMetadata diff --git a/nanovllm/distributed/cpu_offload_manager/cpu_kv_cache_manager.py b/nanovllm/distributed/cpu_offload_manager/cpu_kv_cache_manager.py new file mode 100644 index 000000000..fbc3be205 --- /dev/null +++ b/nanovllm/distributed/cpu_offload_manager/cpu_kv_cache_manager.py @@ -0,0 +1,176 @@ +"""CPU 端 KV Cache 管理器。 + +负责在 Host DRAM 上分配、存储和检索被卸载的 KV Cache 数据, +并通过 RPC 接口供多个 data-parallel EngineCore 实例共享访问。 +对应 vllm-ascend PR #1659 中的 cpu_kv_cache_manager.py。 +""" + +import torch + +from nanovllm.distributed.cpu_offload_manager.metadata import OffloadMetadata + + +class CPUKVCacheManager: + """管理 CPU(Host DRAM)上的 KV Cache 存储池。 + + 该管理器在 CPU 内存中维护一块预分配的 KV Cache 空间, + 支持按 (seq_id, layer_name, block_id) 三元组进行存取。 + + Args: + cpu_swap_space_gb: CPU 端预分配的交换空间大小(GB) + num_layers: 模型的 Transformer 层数 + block_size: 每个 KV block 包含的 token 数 + num_kv_heads: KV head 数量 + head_dim: 每个 head 的维度 + dtype: 数据类型 + """ + + def __init__( + self, + cpu_swap_space_gb: float, + num_layers: int, + block_size: int, + num_kv_heads: int, + head_dim: int, + dtype: torch.dtype = torch.float16, + ): + self.cpu_swap_space_gb = cpu_swap_space_gb + self.num_layers = num_layers + self.block_size = block_size + self.num_kv_heads = num_kv_heads + self.head_dim = head_dim + self.dtype = dtype + + # --- 以下属性在 init_pool 中实际分配 --- + self.cpu_k_cache: torch.Tensor | None = None # CPU 端 K cache 存储 + self.cpu_v_cache: torch.Tensor | None = None # CPU 端 V cache 存储 + self.num_cpu_blocks: int = 0 # CPU 端总 block 数 + self.free_cpu_block_ids: list[int] = [] # 空闲 CPU block 列表 + self.block_mapping: dict[tuple, int] = {} # (seq_id, layer, gpu_block) → cpu_block 映射 + + def init_pool(self) -> None: + """在 CPU 内存中预分配 KV Cache 存储池。 + + Returns: + None + """ + # --- 执行逻辑 --- + # 1. 根据 cpu_swap_space_gb 计算可容纳的 CPU block 总数: + # block_bytes = 2 * block_size * num_kv_heads * head_dim * dtype.itemsize + # num_cpu_blocks = cpu_swap_space_gb * 1024^3 / (num_layers * block_bytes) + # 2. 分配 cpu_k_cache: shape = [num_layers, num_cpu_blocks, block_size, num_kv_heads, head_dim] + # 分配 cpu_v_cache: 同上,均使用 pin_memory 以加速 GPU↔CPU 传输 + # 3. 初始化 free_cpu_block_ids = list(range(num_cpu_blocks)) + pass + + def allocate_cpu_block(self) -> int: + """从空闲池中分配一个 CPU block。 + + Returns: + int: 分配到的 CPU block ID,若无可用 block 则返回 -1 + """ + # --- 执行逻辑 --- + # 1. 检查 free_cpu_block_ids 是否非空 + # 2. 若非空,pop 出一个 block_id 并返回 + # 3. 若为空,返回 -1 表示 CPU 空间不足 + return -1 + + def free_cpu_block(self, cpu_block_id: int) -> None: + """释放一个 CPU block,归还到空闲池。 + + Args: + cpu_block_id: 要释放的 CPU block ID + + Returns: + None + """ + # --- 执行逻辑 --- + # 1. 将 cpu_block_id 追加到 free_cpu_block_ids + # 2. 从 block_mapping 中删除对应的映射条目 + pass + + def swap_out( + self, + gpu_k_cache: torch.Tensor, + gpu_v_cache: torch.Tensor, + metadata: OffloadMetadata, + ) -> bool: + """将指定层的 KV Cache 从 GPU 卸载到 CPU(swap out)。 + + Args: + gpu_k_cache: GPU 端该层的 K cache,shape = [num_blocks, block_size, num_kv_heads, head_dim] + gpu_v_cache: GPU 端该层的 V cache,shape 同上 + metadata: 描述本次卸载的元数据(包含 seq_id, layer_name, block_ids 等) + + Returns: + bool: 卸载是否成功(CPU 空间不足时返回 False) + """ + # --- 执行逻辑 --- + # 1. 解析 metadata 获取 layer_index 和 block_ids + # 2. 对每个 gpu_block_id in block_ids: + # a. 调用 allocate_cpu_block() 获取 cpu_block_id + # b. 若分配失败,回滚已分配的 block 并返回 False + # c. 用非阻塞拷贝将 gpu_k_cache[gpu_block_id] → cpu_k_cache[layer][cpu_block_id] + # d. 同理拷贝 V cache + # e. 记录 block_mapping[(seq_id, layer, gpu_block_id)] = cpu_block_id + # 3. 同步 CUDA stream 确保拷贝完成 + # 4. 返回 True + return False + + def swap_in( + self, + gpu_k_cache: torch.Tensor, + gpu_v_cache: torch.Tensor, + metadata: OffloadMetadata, + ) -> bool: + """将指定层的 KV Cache 从 CPU 加载回 GPU(swap in)。 + + Args: + gpu_k_cache: GPU 端该层的 K cache(目标写入位置) + gpu_v_cache: GPU 端该层的 V cache(目标写入位置) + metadata: 描述本次加载的元数据 + + Returns: + bool: 加载是否成功(找不到对应 CPU block 时返回 False) + """ + # --- 执行逻辑 --- + # 1. 解析 metadata 获取 layer_index, seq_id, block_ids + # 2. 对每个 gpu_block_id in block_ids: + # a. 通过 block_mapping 查找对应的 cpu_block_id + # b. 若找不到,说明数据不在 CPU 上,返回 False + # c. 用非阻塞拷贝将 cpu_k_cache[layer][cpu_block_id] → gpu_k_cache[gpu_block_id] + # d. 同理拷贝 V cache + # e. 释放已加载的 cpu_block(调用 free_cpu_block) + # 3. 同步 CUDA stream 确保拷贝完成 + # 4. 返回 True + return False + + def has_cached(self, seq_id: int, layer_name: str, block_id: int) -> bool: + """查询某个 block 是否已被缓存在 CPU 端。 + + Args: + seq_id: 序列 ID + layer_name: 层名称 + block_id: GPU 端 block ID + + Returns: + bool: 是否存在于 CPU cache 中 + """ + # --- 执行逻辑 --- + # 检查 (seq_id, layer_name, block_id) 是否在 block_mapping 中 + return False + + def clear_seq(self, seq_id: int) -> None: + """清除某个序列在 CPU 端的所有缓存 block。 + + Args: + seq_id: 要清除的序列 ID + + Returns: + None + """ + # --- 执行逻辑 --- + # 1. 遍历 block_mapping,找出所有 key 中 seq_id 匹配的条目 + # 2. 对每个匹配条目调用 free_cpu_block 释放 + # 3. 从 block_mapping 中删除这些条目 + pass diff --git a/nanovllm/distributed/cpu_offload_manager/metadata.py b/nanovllm/distributed/cpu_offload_manager/metadata.py new file mode 100644 index 000000000..eabd7fe32 --- /dev/null +++ b/nanovllm/distributed/cpu_offload_manager/metadata.py @@ -0,0 +1,71 @@ +"""CPU Offload 元数据定义模块。 + +定义 KV Cache 在 GPU 与 CPU 之间卸载时所需的元数据结构。 +对应 vllm-ascend PR #1659 中的 metadata.py。 +""" + +from dataclasses import dataclass, field + + +@dataclass +class OffloadMetadata: + """单次 KV Cache 卸载/加载操作的元数据。 + + Attributes: + seq_id: 请求序列的唯一标识 + layer_name: 模型层名称,如 "layers.0.self_attn" + block_ids: 需要卸载/加载的物理 block ID 列表 + token_length: 该序列当前的 token 总长度 + is_prefill: 当前是否处于 prefill 阶段 + """ + seq_id: int = -1 + layer_name: str = "" + block_ids: list[int] = field(default_factory=list) + token_length: int = 0 + is_prefill: bool = False + + +@dataclass +class SwapRequest: + """描述一次 GPU ↔ CPU 交换请求。 + + Attributes: + metadata: 本次交换对应的元数据 + direction: 交换方向,"swap_out" 表示 GPU→CPU,"swap_in" 表示 CPU→GPU + priority: 优先级,数值越小优先级越高 + """ + metadata: OffloadMetadata = field(default_factory=OffloadMetadata) + direction: str = "swap_out" + priority: int = 0 + + +def build_offload_metadata( + seq_id: int, + layer_name: str, + block_ids: list[int], + token_length: int, + is_prefill: bool, +) -> OffloadMetadata: + """根据当前推理状态构建卸载元数据。 + + Args: + seq_id: 请求序列 ID + layer_name: 模型层名称 + block_ids: 涉及的物理 block ID 列表 + token_length: 序列当前 token 总长度 + is_prefill: 是否处于 prefill 阶段 + + Returns: + OffloadMetadata: 填充完毕的元数据对象 + """ + # --- 执行逻辑 --- + # 1. 将传入参数直接封装为 OffloadMetadata 数据类 + # 2. 返回该元数据实例,供 Connector 和 Manager 在后续 + # swap_out / swap_in 操作中使用 + return OffloadMetadata( + seq_id=seq_id, + layer_name=layer_name, + block_ids=block_ids, + token_length=token_length, + is_prefill=is_prefill, + ) diff --git a/nanovllm/engine/model_runner.py b/nanovllm/engine/model_runner.py index f66c38efd..c94881c47 100644 --- a/nanovllm/engine/model_runner.py +++ b/nanovllm/engine/model_runner.py @@ -8,6 +8,7 @@ from nanovllm.engine.sequence import Sequence from nanovllm.models.qwen3 import Qwen3ForCausalLM from nanovllm.layers.sampler import Sampler +from nanovllm.layers.attention_utils import set_connector from nanovllm.utils.context import set_context, get_context, reset_context from nanovllm.utils.loader import load_model @@ -33,6 +34,7 @@ def __init__(self, config: Config, rank: int, event: Event | list[Event]): self.sampler = Sampler() self.warmup_model() self.allocate_kv_cache() + self._maybe_init_cpu_offload_connector() if not self.enforce_eager: self.capture_cudagraph() torch.set_default_device("cpu") @@ -117,6 +119,24 @@ def allocate_kv_cache(self): module.v_cache = self.kv_cache[1, layer_id] layer_id += 1 + def _maybe_init_cpu_offload_connector(self) -> None: + """根据配置决定是否初始化 CPU Offload Connector。 + + Returns: + None + """ + # --- 执行逻辑 --- + # 1. 检查 config.kv_transfer_config 是否为 None,若是则跳过 + # 2. 若配置了 kv_connector == "CPUOffloadConnector": + # a. 从 config 和 hf_config 中提取 num_layers, block_size, + # num_kv_heads, head_dim, dtype 等参数 + # b. 创建 CPUOffloadConnector 实例 + # c. 调用 connector.initialize() 分配 CPU 内存池并启动 metadata server + # d. 调用 set_connector(connector) 设置全局引用, + # 使 Attention 层的 wait_for_kv_layer / maybe_save_kv_layer 能访问到 + # 3. 若不匹配已知 connector 类型,忽略 + pass + def prepare_block_tables(self, seqs: list[Sequence]): max_len = max(len(seq.block_table) for seq in seqs) block_tables = [seq.block_table + [-1] * (max_len - len(seq.block_table)) for seq in seqs] diff --git a/nanovllm/layers/attention.py b/nanovllm/layers/attention.py index e416139ea..f312e8b5a 100644 --- a/nanovllm/layers/attention.py +++ b/nanovllm/layers/attention.py @@ -5,6 +5,10 @@ from flash_attn import flash_attn_varlen_func, flash_attn_with_kvcache from nanovllm.utils.context import get_context +from nanovllm.layers.attention_utils import ( + wait_for_kv_layer_from_connector, + maybe_save_kv_layer_to_connector, +) @triton.jit @@ -48,17 +52,23 @@ def __init__( head_dim, scale, num_kv_heads, + layer_name: str = "", ): super().__init__() self.num_heads = num_heads self.head_dim = head_dim self.scale = scale self.num_kv_heads = num_kv_heads + self.layer_name = layer_name self.k_cache = self.v_cache = torch.tensor([]) def forward(self, q: torch.Tensor, k: torch.Tensor, v: torch.Tensor): context = get_context() k_cache, v_cache = self.k_cache, self.v_cache + + # --- CPU Offload: prefill 前尝试从 CPU 换入该层 KV Cache --- + wait_for_kv_layer_from_connector(self.layer_name, k_cache, v_cache) + if k_cache.numel() and v_cache.numel(): store_kvcache(k, v, k_cache, v_cache, context.slot_mapping) if context.is_prefill: @@ -68,8 +78,11 @@ def forward(self, q: torch.Tensor, k: torch.Tensor, v: torch.Tensor): max_seqlen_q=context.max_seqlen_q, cu_seqlens_q=context.cu_seqlens_q, max_seqlen_k=context.max_seqlen_k, cu_seqlens_k=context.cu_seqlens_k, softmax_scale=self.scale, causal=True, block_table=context.block_tables) + + # --- CPU Offload: prefill 后尝试将该层 KV Cache 卸载到 CPU --- + maybe_save_kv_layer_to_connector(self.layer_name, k_cache, v_cache) else: # decode o = flash_attn_with_kvcache(q.unsqueeze(1), k_cache, v_cache, - cache_seqlens=context.context_lens, block_table=context.block_tables, + cache_seqlens=context.context_lens, block_table=context.block_tables, softmax_scale=self.scale, causal=True) return o diff --git a/nanovllm/layers/attention_utils.py b/nanovllm/layers/attention_utils.py new file mode 100644 index 000000000..d6075226e --- /dev/null +++ b/nanovllm/layers/attention_utils.py @@ -0,0 +1,99 @@ +"""Attention 层与 CPU Offload Connector 的集成工具函数。 + +提供在 Attention forward 前后调用的钩子函数,用于: +- prefill 前:从 CPU 换入(swap in)该层 KV Cache +- prefill 后:将该层 KV Cache 卸载(swap out)到 CPU + +支持两种 KV Cache 格式: +- 标准 MHA:传入 (k_cache, v_cache) 两个张量 +- MLA 压缩格式:传入 (kv_c_cache, k_pe_cache) 两个张量 +两者接口统一,通过 *kv_cache_tensors 可变参数适配。 + +对应 vllm-ascend PR #1659 中从 attention_v1.py 提取到 utils.py 的两个函数。 +""" + +import torch + +# 全局 connector 引用,由 ModelRunner 初始化时设置 +_CONNECTOR = None + + +def set_connector(connector) -> None: + """设置全局 CPU Offload Connector 实例。 + + Args: + connector: CPUOffloadConnector 实例,或 None 表示不启用 offload + """ + global _CONNECTOR + _CONNECTOR = connector + + +def get_connector(): + """获取全局 CPU Offload Connector 实例。 + + Returns: + CPUOffloadConnector | None + """ + return _CONNECTOR + + +def wait_for_kv_layer_from_connector( + layer_name: str, + *kv_cache_tensors: torch.Tensor, +) -> None: + """在 Attention forward 之前调用,等待从 CPU 换入该层的 KV Cache。 + + 若未启用 CPU Offload(connector 为 None),则直接返回不做任何操作。 + + 支持两种调用方式: + - 标准 MHA:wait_for_kv_layer_from_connector(name, k_cache, v_cache) + - MLA 格式:wait_for_kv_layer_from_connector(name, kv_c_cache, k_pe_cache) + + Args: + layer_name: 当前层名称,如 "layers.0.self_attn" + *kv_cache_tensors: GPU/NPU 端该层的 KV cache 张量(2 个) + + Returns: + None(原地写入 kv_cache_tensors) + """ + # --- 执行逻辑 --- + # 1. 检查全局 _CONNECTOR 是否为 None,若是则直接 return + # 2. 从当前推理上下文(get_context())中获取 seq_id、block_ids 等信息 + # 3. 检查该层对应的 block 是否已被 swap out 到 CPU + # 4. 调用 connector.recv_kv_caches_and_hidden_states( + # layer_name, seq_id, kv_cache_tensors[0], kv_cache_tensors[1], + # block_ids, token_length, is_prefill) + # 将 CPU 上缓存的 KV 数据拷贝回 GPU/NPU 端 + # 5. 等待传输完成(stream 同步),确保后续算子读到有效数据 + pass + + +def maybe_save_kv_layer_to_connector( + layer_name: str, + *kv_cache_tensors: torch.Tensor, +) -> None: + """在 prefill Attention forward 之后调用,尝试将该层 KV Cache 卸载到 CPU。 + + 仅在 prefill 阶段且启用了 CPU Offload 时生效。 + + 支持两种调用方式: + - 标准 MHA:maybe_save_kv_layer_to_connector(name, k_cache, v_cache) + - MLA 格式:maybe_save_kv_layer_to_connector(name, kv_c_cache, k_pe_cache) + + Args: + layer_name: 当前层名称 + *kv_cache_tensors: GPU/NPU 端该层的 KV cache 张量(2 个) + + Returns: + None + """ + # --- 执行逻辑 --- + # 1. 检查全局 _CONNECTOR 是否为 None,若是则直接 return + # 2. 从当前推理上下文中获取 seq_id、block_ids、token_length、is_prefill + # 3. 仅在 is_prefill == True 时执行卸载 + # 4. 调用 connector.send_kv_caches_and_hidden_states( + # layer_name, seq_id, kv_cache_tensors[0], kv_cache_tensors[1], + # block_ids, token_length, is_prefill) + # 通过异步拷贝将 GPU/NPU 上的 KV Cache 数据写入 CPU pin_memory 区域 + # 5. 异步执行不等待——减少对推理延迟的影响 + pass diff --git a/nanovllm/layers/mla_attention.py b/nanovllm/layers/mla_attention.py new file mode 100644 index 000000000..5b4d55450 --- /dev/null +++ b/nanovllm/layers/mla_attention.py @@ -0,0 +1,190 @@ +"""Multi-head Latent Attention (MLA) 实现。 + +MLA 是 DeepSeek-V2 提出的注意力机制,核心思想: + - 不存储完整的 K、V(每头 head_dim 维),而是压缩为低秩隐向量 kv_c(kv_lora_rank 维) + - KV Cache 只存 (kv_c, k_pe),相比标准 MHA 大幅节省显存 + - 计算 Attention 时再通过 kv_b_proj 将 kv_c 展开为完整的 K_nope, V + +CPU Offload 适配: + MLA 的压缩 KV Cache 数据量远小于标准 MHA, + 使得 GPU↔CPU 卸载的传输量大幅降低,是长上下文场景的关键优化。 + +对应 vllm-ascend PR #1659 中的 mla_v1.py。 +""" + +import torch +from torch import nn + +from nanovllm.utils.context import get_context +from nanovllm.layers.attention_utils import ( + wait_for_kv_layer_from_connector, + maybe_save_kv_layer_to_connector, +) + + +class MLAAttention(nn.Module): + """Multi-head Latent Attention 模块。 + + KV 压缩流程: + hidden → kv_a_proj_with_mqa → [kv_c (kv_lora_rank), k_pe (qk_rope_head_dim)] + kv_c → kv_a_layernorm → kv_b_proj → [k_nope (num_heads * qk_nope_head_dim), v (num_heads * v_head_dim)] + + Query 压缩流程: + hidden → q_a_proj → q_c (q_lora_rank) + q_c → q_a_layernorm → q_b_proj → [q_nope (num_heads * qk_nope_head_dim), q_pe (num_heads * qk_rope_head_dim)] + + KV Cache 只存 [kv_c_cache, k_pe_cache],不存完整 K/V。 + + Args: + hidden_size: 模型隐藏层维度 + num_heads: 注意力头数 + qk_nope_head_dim: 每头 query/key 的非 RoPE 部分维度 + qk_rope_head_dim: 每头 query/key 的 RoPE 部分维度 + v_head_dim: 每头 value 的维度 + q_lora_rank: query 低秩压缩的秩 + kv_lora_rank: KV 低秩压缩的秩 + rope_theta: RoPE 的 theta 基数 + layer_name: 层名称标识,用于 CPU Offload 按层管理 + """ + + def __init__( + self, + hidden_size: int, + num_heads: int, + qk_nope_head_dim: int, + qk_rope_head_dim: int, + v_head_dim: int, + q_lora_rank: int, + kv_lora_rank: int, + rope_theta: float = 10000.0, + layer_name: str = "", + ): + super().__init__() + self.hidden_size = hidden_size + self.num_heads = num_heads + self.qk_nope_head_dim = qk_nope_head_dim + self.qk_rope_head_dim = qk_rope_head_dim + self.qk_head_dim = qk_nope_head_dim + qk_rope_head_dim + self.v_head_dim = v_head_dim + self.q_lora_rank = q_lora_rank + self.kv_lora_rank = kv_lora_rank + self.layer_name = layer_name + + self.scaling = self.qk_head_dim ** -0.5 + + # --- 以下权重矩阵在实际实现中为 nn.Linear / 并行 Linear --- + # q_a_proj: hidden_size → q_lora_rank (查询压缩) + # q_a_layernorm: RMSNorm(q_lora_rank) + # q_b_proj: q_lora_rank → num_heads * qk_head_dim (查询展开) + # kv_a_proj_with_mqa: hidden_size → kv_lora_rank + qk_rope_head_dim (KV压缩+位置) + # kv_a_layernorm: RMSNorm(kv_lora_rank) + # kv_b_proj: kv_lora_rank → num_heads * (qk_nope_head_dim + v_head_dim) (KV展开) + # o_proj: num_heads * v_head_dim → hidden_size (输出投影) + + # KV Cache:压缩表示,比标准 MHA 小得多 + # kv_c_cache: [num_blocks, block_size, kv_lora_rank] + # k_pe_cache: [num_blocks, block_size, qk_rope_head_dim] + self.kv_cache: list[torch.Tensor] = [] # [kv_c_cache, k_pe_cache] + + def _mla_preprocess( + self, + layer_name: str, + hidden_states: torch.Tensor, + slot_mapping: torch.Tensor, + is_prefill: bool, + ) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor | None]: + """MLA 预处理:压缩投影 + RMSNorm + RoPE + KV Cache 写入。 + + 对应 vllm-ascend 中 AscendMLAImpl._mla_preprocess(), + PR #1659 将 layer_name 加入签名以支持按层 CPU Offload。 + + Args: + layer_name: 当前层名称,用于 CPU Offload 按层定位 + hidden_states: 输入隐藏状态,shape = [num_tokens, hidden_size] + slot_mapping: 每个 token 在 KV Cache 中的 slot 位置 + is_prefill: 是否为 prefill 阶段 + + Returns: + q_nope: 查询非 RoPE 部分,shape = [num_tokens, num_heads, qk_nope_head_dim] + q_pe: 查询 RoPE 部分,shape = [num_tokens, num_heads, qk_rope_head_dim] + k_nope: 键非 RoPE 部分,shape = [num_tokens, num_heads, qk_nope_head_dim](prefill) + 或从 kv_c_cache 中重建(decode) + k_pe: 键 RoPE 部分,shape = [num_tokens, 1, qk_rope_head_dim] + v: 值,shape = [num_tokens, num_heads, v_head_dim](仅 prefill 返回) + decode 时为 None,由 attention kernel 内部从 cache 重建 + """ + # --- CPU Offload: 计算前先尝试从 CPU 换入该层的压缩 KV Cache --- + wait_for_kv_layer_from_connector(layer_name, *self.kv_cache) + + # --- 执行逻辑 --- + # 1. Query 压缩路径: + # q_c = q_a_proj(hidden_states) # [num_tokens, q_lora_rank] + # q_c = q_a_layernorm(q_c) # RMSNorm 归一化 + # q = q_b_proj(q_c) # [num_tokens, num_heads * qk_head_dim] + # q = q.view(num_tokens, num_heads, qk_head_dim) + # q_nope, q_pe = q.split([qk_nope_head_dim, qk_rope_head_dim], dim=-1) + # 对 q_pe 施加 RoPE 旋转 + # + # 2. KV 压缩路径: + # kv_no_split = kv_a_proj_with_mqa(hidden_states) # [num_tokens, kv_lora_rank + qk_rope_head_dim] + # + # 3. 对 kv_c 做 RMSNorm,对 k_pe 施加 RoPE: + # kv_c, k_pe = kv_no_split.split([kv_lora_rank, qk_rope_head_dim], dim=-1) + # kv_c_normed = kv_a_layernorm(kv_c) + # k_pe_roped = rotary_emb(k_pe) + # 将 kv_c_normed 和 k_pe_roped 写入 self.kv_cache 对应 slot + # + # 4. Prefill 分支: + # k_nope, v = kv_b_proj(kv_c_normed) 展开为完整 K_nope 和 V + # 返回 (q_nope, q_pe, k_nope, k_pe_roped, v) + # + # 5. Decode 分支: + # k_nope 和 v 不在此处展开——由 decode attention kernel + # 在读取 kv_c_cache 时通过 absorbed kv_b_proj 权重直接计算 + # 返回 (q_nope, q_pe, None, k_pe_roped, None) + pass + + def forward( + self, + layer_name: str, + hidden_states: torch.Tensor, + ) -> torch.Tensor: + """MLA forward:预处理 → Attention 计算 → 输出投影。 + + 对应 vllm-ascend 中 AscendMLAImpl.forward(), + PR #1659 增加 layer_name 参数以支持 CPU Offload。 + + Args: + layer_name: 当前层名称 + hidden_states: 输入隐藏状态,shape = [num_tokens, hidden_size] + + Returns: + output: 注意力输出,shape = [num_tokens, hidden_size] + """ + context = get_context() + + # --- 执行逻辑 --- + # 1. 调用 _mla_preprocess 获取 q_nope, q_pe, k_nope, k_pe, v + # q_nope, q_pe, k_nope, k_pe, v = self._mla_preprocess( + # layer_name, hidden_states, context.slot_mapping, context.is_prefill) + # + # 2. Prefill Attention(使用 flash_attn_varlen_func 或等效昇腾算子): + # a. 拼接 q = concat(q_nope, q_pe) → [num_tokens, num_heads, qk_head_dim] + # b. 拼接 k = concat(k_nope, k_pe.expand(num_heads)) → [num_tokens, num_heads, qk_head_dim] + # c. o = flash_attn_varlen(q, k, v, ..., causal=True) + # + # 3. Decode Attention(使用 flash_attn_with_kvcache 的 MLA 变体): + # a. 拼接 q = concat(q_nope, q_pe) + # b. attention kernel 内部直接从 kv_c_cache 和 k_pe_cache 读取, + # 通过 absorbed kv_b_proj 权重在 on-the-fly 展开 k_nope 和 v + # (避免在 HBM 中存储展开后的完整 K/V,节省带宽) + # c. o = mla_decode_attention(q, kv_c_cache, k_pe_cache, kv_b_proj_weight, ...) + # + # 4. CPU Offload: prefill 后将该层压缩 KV Cache 卸载到 CPU + # maybe_save_kv_layer_to_connector(layer_name, *self.kv_cache) + # + # 5. 输出投影: + # output = o_proj(o.flatten(-2, -1)) # [num_tokens, hidden_size] + # + # 6. 返回 output + pass diff --git a/nanovllm/models/deepseek_v2.py b/nanovllm/models/deepseek_v2.py new file mode 100644 index 000000000..2a511e5a4 --- /dev/null +++ b/nanovllm/models/deepseek_v2.py @@ -0,0 +1,249 @@ +"""DeepSeek-V2 模型定义(MLA + MoE 架构)。 + +还原 vllm-ascend PR #1659 中对 DeepSeek V2 模型的 CPU Offload 适配: +- DeepSeekV2Attention 使用 MLA(Multi-head Latent Attention),KV Cache 为压缩表示 +- layer_name 从 DecoderLayer 逐层传递到 MLAAttention,供 CPU Offload 按层管理 +- MoE(Mixture of Experts)层使用标准前馈网络桩 + +在昇腾 910C 上的特殊优化: +- MLA 的 KV Cache 是压缩的 (kv_c, k_pe),数据量远小于标准 MHA +- CPU Offload 时 scatter+burst 传输的数据量更小,进一步降低传输开销 +""" + +import torch +from torch import nn + +from nanovllm.layers.mla_attention import MLAAttention +from nanovllm.layers.layernorm import RMSNorm + + +class DeepSeekV2Attention(nn.Module): + """DeepSeek-V2 注意力层包装器。 + + 将 MLA 注意力封装为标准的 Attention 接口, + 负责 layer_name 的传递和 CPU Offload 的集成。 + + Args: + config: DeepSeek-V2 模型配置 + layer_idx: 当前层索引,用于生成 layer_name + """ + + def __init__(self, config, layer_idx: int = 0): + super().__init__() + self.layer_name = f"layers.{layer_idx}.self_attn" + + # --- 实际实现中从 config 读取以下参数 --- + # config.hidden_size: 隐藏层维度(如 5120) + # config.num_attention_heads: 注意力头数(如 128) + # config.qk_nope_head_dim: 非 RoPE 维度(如 128) + # config.qk_rope_head_dim: RoPE 维度(如 64) + # config.v_head_dim: V 维度(如 128) + # config.q_lora_rank: Q 压缩秩(如 1536) + # config.kv_lora_rank: KV 压缩秩(如 512) + + self.attn = None # MLAAttention 实例,实际实现中在此创建 + + def forward( + self, + positions: torch.Tensor, + hidden_states: torch.Tensor, + ) -> torch.Tensor: + """DeepSeek-V2 注意力前向。 + + Args: + positions: 位置编码,shape = [num_tokens] + hidden_states: 输入隐藏状态,shape = [num_tokens, hidden_size] + + Returns: + output: 注意力输出,shape = [num_tokens, hidden_size] + """ + # --- 执行逻辑 --- + # 1. 调用 MLA 注意力,传入 layer_name 以支持按层 CPU Offload: + # output = self.attn(self.layer_name, hidden_states) + # + # 2. MLA 内部会: + # a. 调用 wait_for_kv_layer_from_connector 尝试 swap in + # b. 执行压缩投影 + RMSNorm + RoPE(910C 上为融合算子) + # c. 执行 Attention 计算 + # d. 调用 maybe_save_kv_layer_to_connector 尝试 swap out + # e. 输出投影 + # + # 3. 返回 output + pass + + +class DeepSeekV2MoEMLP(nn.Module): + """DeepSeek-V2 MoE(Mixture of Experts)前馈网络。 + + Args: + config: 模型配置 + layer_idx: 层索引 + """ + + def __init__(self, config, layer_idx: int = 0): + super().__init__() + # --- 实际实现中包含以下组件 --- + # config.num_experts: 总专家数(如 160) + # config.num_experts_per_tok: 每 token 激活的专家数(如 6) + # config.moe_intermediate_size: 专家 FFN 中间维度 + # self.gate: 门控网络,hidden_size → num_experts + # self.experts: nn.ModuleList of FFN experts + # self.shared_expert: 共享专家 FFN(所有 token 都经过) + pass + + def forward(self, hidden_states: torch.Tensor) -> torch.Tensor: + """MoE 前向:门控路由 + 专家并行计算。 + + Args: + hidden_states: 输入,shape = [num_tokens, hidden_size] + + Returns: + output: MoE 输出,shape = [num_tokens, hidden_size] + """ + # --- 执行逻辑 --- + # 1. 门控路由: + # router_logits = self.gate(hidden_states) # [num_tokens, num_experts] + # topk_weights, topk_ids = topk(router_logits, k=num_experts_per_tok) + # + # 2. 专家并行计算(在昇腾 910C 上可利用多 AI Core 并行): + # expert_output = sum( + # weight_i * expert_i(hidden_states) + # for weight_i, expert_i in selected_experts + # ) + # + # 3. 共享专家: + # shared_output = self.shared_expert(hidden_states) + # + # 4. 合并:output = expert_output + shared_output + # 5. 返回 output + pass + + +class DeepSeekV2DecoderLayer(nn.Module): + """DeepSeek-V2 解码器层:MLA Attention + MoE/Dense MLP。 + + Args: + config: 模型配置 + layer_idx: 层索引,传递给 Attention 以生成 layer_name + """ + + def __init__(self, config, layer_idx: int = 0): + super().__init__() + self.self_attn = DeepSeekV2Attention(config, layer_idx=layer_idx) + + # DeepSeek-V2 中部分层使用 MoE,部分使用 Dense MLP + # 由 config.moe_layer_freq 和 config.first_k_dense_replace 决定 + self.mlp = DeepSeekV2MoEMLP(config, layer_idx=layer_idx) + + self.input_layernorm = None # RMSNorm(hidden_size) + self.post_attention_layernorm = None # RMSNorm(hidden_size) + + def forward( + self, + positions: torch.Tensor, + hidden_states: torch.Tensor, + residual: torch.Tensor | None, + ) -> tuple[torch.Tensor, torch.Tensor]: + """解码器层前向。 + + Args: + positions: 位置编码 + hidden_states: 输入隐藏状态 + residual: 残差连接张量 + + Returns: + hidden_states: 输出隐藏状态 + residual: 更新后的残差张量 + """ + # --- 执行逻辑 --- + # 1. 第一个残差连接 + LayerNorm: + # if residual is None: + # hidden_states, residual = input_layernorm(hidden_states), hidden_states + # else: + # hidden_states, residual = input_layernorm(hidden_states, residual) + # + # 2. MLA 注意力(内含 CPU Offload 钩子): + # hidden_states = self.self_attn(positions, hidden_states) + # + # 3. 第二个残差连接 + LayerNorm: + # hidden_states, residual = post_attention_layernorm(hidden_states, residual) + # + # 4. MoE / Dense MLP: + # hidden_states = self.mlp(hidden_states) + # + # 5. 返回 (hidden_states, residual) + pass + + +class DeepSeekV2ForCausalLM(nn.Module): + """DeepSeek-V2 因果语言模型。 + + 完整模型结构: + Embedding → [DecoderLayer × N] → RMSNorm → LMHead + + MLA + CPU Offload 集成: + 每个 DecoderLayer 通过 layer_idx 生成唯一 layer_name, + MLA 的压缩 KV Cache (kv_c, k_pe) 在 prefill 后可按层卸载到 CPU, + decode 时按需 swap in。 + + Args: + config: DeepSeek-V2 模型配置 + """ + + packed_modules_mapping = { + "q_a_proj": ("fused_qkv_a_proj", "q"), + "kv_a_proj_with_mqa": ("fused_qkv_a_proj", "kv"), + "gate_proj": ("gate_up_proj", 0), + "up_proj": ("gate_up_proj", 1), + } + + def __init__(self, config): + super().__init__() + # self.embed_tokens = VocabParallelEmbedding(config.vocab_size, config.hidden_size) + # self.layers = nn.ModuleList([ + # DeepSeekV2DecoderLayer(config, layer_idx=i) + # for i in range(config.num_hidden_layers) + # ]) + # self.norm = RMSNorm(config.hidden_size) + # self.lm_head = ParallelLMHead(config.vocab_size, config.hidden_size) + pass + + def forward( + self, + input_ids: torch.Tensor, + positions: torch.Tensor, + ) -> torch.Tensor: + """模型前向传播。 + + Args: + input_ids: 输入 token ID,shape = [num_tokens] + positions: 位置信息,shape = [num_tokens] + + Returns: + hidden_states: 最后一层的隐藏状态,shape = [num_tokens, hidden_size] + """ + # --- 执行逻辑 --- + # 1. hidden_states = embed_tokens(input_ids) + # 2. residual = None + # 3. for layer in self.layers: + # hidden_states, residual = layer(positions, hidden_states, residual) + # 每一层的 MLA Attention 内部会: + # - wait_for_kv_layer_from_connector(layer_name, kv_c_cache, k_pe_cache) + # - 执行压缩投影 + 融合 RMSNorm+RoPE+Cache 写入 + # - Flash Attention 计算 + # - maybe_save_kv_layer_to_connector(layer_name, kv_c_cache, k_pe_cache) + # 4. hidden_states, _ = norm(hidden_states, residual) + # 5. return hidden_states + pass + + def compute_logits(self, hidden_states: torch.Tensor) -> torch.Tensor: + """计算词表 logits。 + + Args: + hidden_states: shape = [num_tokens, hidden_size] + + Returns: + logits: shape = [num_tokens, vocab_size] + """ + # return self.lm_head(hidden_states) + pass diff --git a/nanovllm/models/qwen3.py b/nanovllm/models/qwen3.py index 5d39e0b90..84480a964 100755 --- a/nanovllm/models/qwen3.py +++ b/nanovllm/models/qwen3.py @@ -24,6 +24,7 @@ def __init__( qkv_bias: bool = False, rope_theta: float = 10000, rope_scaling: tuple | None = None, + layer_name: str = "", ) -> None: super().__init__() tp_size = dist.get_world_size() @@ -63,6 +64,7 @@ def __init__( self.head_dim, self.scaling, self.num_kv_heads, + layer_name=layer_name, ) if not self.qkv_bias: self.q_norm = RMSNorm(self.head_dim, eps=rms_norm_eps) @@ -121,6 +123,7 @@ class Qwen3DecoderLayer(nn.Module): def __init__( self, config: Qwen3Config, + layer_idx: int = 0, ) -> None: super().__init__() self.self_attn = Qwen3Attention( @@ -133,6 +136,7 @@ def __init__( head_dim=getattr(config, 'head_dim', None), rope_theta=getattr(config, "rope_theta", 1000000), rope_scaling=getattr(config, "rope_scaling", None), + layer_name=f"layers.{layer_idx}.self_attn", ) self.mlp = Qwen3MLP( hidden_size=config.hidden_size, @@ -166,7 +170,7 @@ def __init__( ) -> None: super().__init__() self.embed_tokens = VocabParallelEmbedding(config.vocab_size, config.hidden_size) - self.layers = nn.ModuleList([Qwen3DecoderLayer(config) for _ in range(config.num_hidden_layers)]) + self.layers = nn.ModuleList([Qwen3DecoderLayer(config, layer_idx=i) for i in range(config.num_hidden_layers)]) self.norm = RMSNorm(config.hidden_size, eps=config.rms_norm_eps) def forward(