From 722eaa2c2721e4f19d1ab79b0e0413ee4a8ef517 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 12 Apr 2026 12:31:45 +0000 Subject: [PATCH 1/3] =?UTF-8?q?feat:=20add=20CPU=20offload=20connector=20s?= =?UTF-8?q?tubs=20for=20KV=20cache=20GPU=E2=86=94CPU=20transfer?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Port the architecture from vllm-ascend PR #1659 into nano-vllm as minimal stub functions with Chinese comments describing the execution logic. This adds: - nanovllm/distributed/: CPUOffloadConnector, CPUKVCacheManager, OffloadMetadata and SwapRequest data structures - nanovllm/layers/attention_utils.py: wait_for_kv_layer_from_connector and maybe_save_kv_layer_to_connector hooks - Config.kv_transfer_config: new KVTransferConfig dataclass - Attention layer integration: layer_name propagation from Qwen3DecoderLayer → Qwen3Attention → Attention, with offload hooks in Attention.forward() - ModelRunner._maybe_init_cpu_offload_connector() initialization stub https://claude.ai/code/session_01WTTVHqXWMtHAQt5picfcig --- nanovllm/config.py | 17 ++ nanovllm/distributed/__init__.py | 1 + nanovllm/distributed/cpu_offload_connector.py | 163 ++++++++++++++++ .../cpu_offload_manager/__init__.py | 2 + .../cpu_kv_cache_manager.py | 176 ++++++++++++++++++ .../cpu_offload_manager/metadata.py | 71 +++++++ nanovllm/engine/model_runner.py | 20 ++ nanovllm/layers/attention.py | 15 +- nanovllm/layers/attention_utils.py | 88 +++++++++ nanovllm/models/qwen3.py | 6 +- 10 files changed, 557 insertions(+), 2 deletions(-) create mode 100644 nanovllm/distributed/__init__.py create mode 100644 nanovllm/distributed/cpu_offload_connector.py create mode 100644 nanovllm/distributed/cpu_offload_manager/__init__.py create mode 100644 nanovllm/distributed/cpu_offload_manager/cpu_kv_cache_manager.py create mode 100644 nanovllm/distributed/cpu_offload_manager/metadata.py create mode 100644 nanovllm/layers/attention_utils.py diff --git a/nanovllm/config.py b/nanovllm/config.py index 959ffb357..329ca9eb0 100644 --- a/nanovllm/config.py +++ b/nanovllm/config.py @@ -3,6 +3,22 @@ from transformers import AutoConfig +@dataclass +class KVTransferConfig: + """KV Cache 传输配置(CPU Offload 等场景)。 + + Attributes: + kv_connector: 连接器类名,如 "CPUOffloadConnector" + kv_role: 连接器角色,"kv_both" 表示同时承担 send/recv + swap_in_threshold: swap in 触发阈值 + cpu_swap_space_gb: CPU 端交换空间大小(GB) + """ + kv_connector: str = "" + kv_role: str = "kv_both" + swap_in_threshold: int = 0 + cpu_swap_space_gb: float = 100.0 + + @dataclass class Config: model: str @@ -16,6 +32,7 @@ class Config: eos: int = -1 kvcache_block_size: int = 256 num_kvcache_blocks: int = -1 + kv_transfer_config: KVTransferConfig | None = None def __post_init__(self): assert os.path.isdir(self.model) diff --git a/nanovllm/distributed/__init__.py b/nanovllm/distributed/__init__.py new file mode 100644 index 000000000..b53738650 --- /dev/null +++ b/nanovllm/distributed/__init__.py @@ -0,0 +1 @@ +from nanovllm.distributed.cpu_offload_connector import CPUOffloadConnector diff --git a/nanovllm/distributed/cpu_offload_connector.py b/nanovllm/distributed/cpu_offload_connector.py new file mode 100644 index 000000000..919cb336c --- /dev/null +++ b/nanovllm/distributed/cpu_offload_connector.py @@ -0,0 +1,163 @@ +"""CPU Offload Connector 主模块。 + +实现 GPU KV Cache 到 CPU(Host DRAM)的卸载与加载连接器。 +在 Connector 内部启动 metadata server,管理 CPU KV Cache 并提供 +RPC 调用接口,支持多个 data-parallel EngineCore 实例之间共享 KV Cache。 +对应 vllm-ascend PR #1659 中的 cpu_offload_connector.py。 +""" + +import torch + +from nanovllm.distributed.cpu_offload_manager.cpu_kv_cache_manager import CPUKVCacheManager +from nanovllm.distributed.cpu_offload_manager.metadata import ( + OffloadMetadata, + SwapRequest, + build_offload_metadata, +) + + +class CPUOffloadConnector: + """KV Cache CPU 卸载连接器。 + + 负责协调 GPU 端 Attention 层与 CPU 端 KV Cache Manager 之间的数据传输。 + 支持在 prefill 完成后将 KV Cache 卸载到 CPU,以及在需要时从 CPU 换回 GPU。 + + Args: + num_layers: 模型 Transformer 层数 + block_size: 每个 KV block 的 token 数 + num_kv_heads: KV head 数量 + head_dim: 每个 head 的维度 + dtype: 数据类型 + swap_in_threshold: swap in 触发阈值(GPU 空闲 block 低于此值时触发) + cpu_swap_space_gb: CPU 端预分配的交换空间(GB) + """ + + def __init__( + self, + num_layers: int, + block_size: int, + num_kv_heads: int, + head_dim: int, + dtype: torch.dtype = torch.float16, + swap_in_threshold: int = 0, + cpu_swap_space_gb: float = 100.0, + ): + self.num_layers = num_layers + self.block_size = block_size + self.num_kv_heads = num_kv_heads + self.head_dim = head_dim + self.dtype = dtype + self.swap_in_threshold = swap_in_threshold + self.cpu_swap_space_gb = cpu_swap_space_gb + + self.manager: CPUKVCacheManager | None = None # 延迟初始化 + self.pending_swap_out: list[SwapRequest] = [] # 待执行的 swap out 请求队列 + self.pending_swap_in: list[SwapRequest] = [] # 待执行的 swap in 请求队列 + self._is_initialized: bool = False + + def initialize(self) -> None: + """初始化连接器:创建 CPU KV Cache Manager 并启动 metadata server。 + + Returns: + None + """ + # --- 执行逻辑 --- + # 1. 创建 CPUKVCacheManager 实例,传入配置参数 + # 2. 调用 manager.init_pool() 在 CPU 内存上预分配 KV Cache 存储池 + # 3. 启动 metadata server(RPC 服务),监听来自其他 DP 实例的请求 + # - 绑定端口,注册 swap_out / swap_in / query 等 RPC 方法 + # - 使 metadata 和 CPU KV Cache 可被多个 DP EngineCore 共享 + # 4. 设置 _is_initialized = True + pass + + def send_kv_caches_and_hidden_states( + self, + layer_name: str, + seq_id: int, + gpu_k_cache: torch.Tensor, + gpu_v_cache: torch.Tensor, + block_ids: list[int], + token_length: int, + is_prefill: bool, + ) -> bool: + """在 prefill 完成后将指定层的 KV Cache 卸载到 CPU(swap out)。 + + Args: + layer_name: 模型层名称,如 "layers.0.self_attn" + seq_id: 序列 ID + gpu_k_cache: GPU 端该层的 K cache 张量 + gpu_v_cache: GPU 端该层的 V cache 张量 + block_ids: 需要卸载的 GPU block ID 列表 + token_length: 序列当前 token 长度 + is_prefill: 是否处于 prefill 阶段 + + Returns: + bool: 卸载是否成功 + """ + # --- 执行逻辑 --- + # 1. 构建 OffloadMetadata(调用 build_offload_metadata) + # 2. 创建 SwapRequest,direction="swap_out" + # 3. 调用 manager.swap_out(gpu_k_cache, gpu_v_cache, metadata) + # 将 GPU 上的 KV Cache 异步拷贝到 CPU pin_memory 区域 + # 4. 返回操作是否成功 + return False + + def recv_kv_caches_and_hidden_states( + self, + layer_name: str, + seq_id: int, + gpu_k_cache: torch.Tensor, + gpu_v_cache: torch.Tensor, + block_ids: list[int], + token_length: int, + is_prefill: bool, + ) -> bool: + """在需要时将 KV Cache 从 CPU 加载回 GPU(swap in)。 + + Args: + layer_name: 模型层名称 + seq_id: 序列 ID + gpu_k_cache: GPU 端该层的 K cache 张量(写入目标) + gpu_v_cache: GPU 端该层的 V cache 张量(写入目标) + block_ids: 需要加载的 GPU block ID 列表 + token_length: 序列当前 token 长度 + is_prefill: 是否处于 prefill 阶段 + + Returns: + bool: 加载是否成功 + """ + # --- 执行逻辑 --- + # 1. 构建 OffloadMetadata + # 2. 检查 manager 中是否缓存了对应的 block + # 若未缓存,跳过(可能该 block 尚未被 swap out 过) + # 3. 调用 manager.swap_in(gpu_k_cache, gpu_v_cache, metadata) + # 将 CPU 上的 KV Cache 拷贝回 GPU + # 4. 返回操作是否成功 + return False + + def should_swap_in(self, num_free_gpu_blocks: int) -> bool: + """判断是否应触发 swap in 操作。 + + Args: + num_free_gpu_blocks: GPU 端当前空闲的 block 数量 + + Returns: + bool: 是否需要从 CPU 换入 KV Cache 到 GPU + """ + # --- 执行逻辑 --- + # 若 GPU 空闲 block 数低于 swap_in_threshold,返回 True + # 否则返回 False + return False + + def close(self) -> None: + """关闭连接器,释放 CPU 资源并停止 metadata server。 + + Returns: + None + """ + # --- 执行逻辑 --- + # 1. 停止 metadata server(RPC 服务),关闭端口 + # 2. 释放 manager 中的 CPU KV Cache 内存池 + # 3. 清空 pending_swap_out 和 pending_swap_in 队列 + # 4. 设置 _is_initialized = False + pass diff --git a/nanovllm/distributed/cpu_offload_manager/__init__.py b/nanovllm/distributed/cpu_offload_manager/__init__.py new file mode 100644 index 000000000..f5f64af56 --- /dev/null +++ b/nanovllm/distributed/cpu_offload_manager/__init__.py @@ -0,0 +1,2 @@ +from nanovllm.distributed.cpu_offload_manager.cpu_kv_cache_manager import CPUKVCacheManager +from nanovllm.distributed.cpu_offload_manager.metadata import OffloadMetadata diff --git a/nanovllm/distributed/cpu_offload_manager/cpu_kv_cache_manager.py b/nanovllm/distributed/cpu_offload_manager/cpu_kv_cache_manager.py new file mode 100644 index 000000000..fbc3be205 --- /dev/null +++ b/nanovllm/distributed/cpu_offload_manager/cpu_kv_cache_manager.py @@ -0,0 +1,176 @@ +"""CPU 端 KV Cache 管理器。 + +负责在 Host DRAM 上分配、存储和检索被卸载的 KV Cache 数据, +并通过 RPC 接口供多个 data-parallel EngineCore 实例共享访问。 +对应 vllm-ascend PR #1659 中的 cpu_kv_cache_manager.py。 +""" + +import torch + +from nanovllm.distributed.cpu_offload_manager.metadata import OffloadMetadata + + +class CPUKVCacheManager: + """管理 CPU(Host DRAM)上的 KV Cache 存储池。 + + 该管理器在 CPU 内存中维护一块预分配的 KV Cache 空间, + 支持按 (seq_id, layer_name, block_id) 三元组进行存取。 + + Args: + cpu_swap_space_gb: CPU 端预分配的交换空间大小(GB) + num_layers: 模型的 Transformer 层数 + block_size: 每个 KV block 包含的 token 数 + num_kv_heads: KV head 数量 + head_dim: 每个 head 的维度 + dtype: 数据类型 + """ + + def __init__( + self, + cpu_swap_space_gb: float, + num_layers: int, + block_size: int, + num_kv_heads: int, + head_dim: int, + dtype: torch.dtype = torch.float16, + ): + self.cpu_swap_space_gb = cpu_swap_space_gb + self.num_layers = num_layers + self.block_size = block_size + self.num_kv_heads = num_kv_heads + self.head_dim = head_dim + self.dtype = dtype + + # --- 以下属性在 init_pool 中实际分配 --- + self.cpu_k_cache: torch.Tensor | None = None # CPU 端 K cache 存储 + self.cpu_v_cache: torch.Tensor | None = None # CPU 端 V cache 存储 + self.num_cpu_blocks: int = 0 # CPU 端总 block 数 + self.free_cpu_block_ids: list[int] = [] # 空闲 CPU block 列表 + self.block_mapping: dict[tuple, int] = {} # (seq_id, layer, gpu_block) → cpu_block 映射 + + def init_pool(self) -> None: + """在 CPU 内存中预分配 KV Cache 存储池。 + + Returns: + None + """ + # --- 执行逻辑 --- + # 1. 根据 cpu_swap_space_gb 计算可容纳的 CPU block 总数: + # block_bytes = 2 * block_size * num_kv_heads * head_dim * dtype.itemsize + # num_cpu_blocks = cpu_swap_space_gb * 1024^3 / (num_layers * block_bytes) + # 2. 分配 cpu_k_cache: shape = [num_layers, num_cpu_blocks, block_size, num_kv_heads, head_dim] + # 分配 cpu_v_cache: 同上,均使用 pin_memory 以加速 GPU↔CPU 传输 + # 3. 初始化 free_cpu_block_ids = list(range(num_cpu_blocks)) + pass + + def allocate_cpu_block(self) -> int: + """从空闲池中分配一个 CPU block。 + + Returns: + int: 分配到的 CPU block ID,若无可用 block 则返回 -1 + """ + # --- 执行逻辑 --- + # 1. 检查 free_cpu_block_ids 是否非空 + # 2. 若非空,pop 出一个 block_id 并返回 + # 3. 若为空,返回 -1 表示 CPU 空间不足 + return -1 + + def free_cpu_block(self, cpu_block_id: int) -> None: + """释放一个 CPU block,归还到空闲池。 + + Args: + cpu_block_id: 要释放的 CPU block ID + + Returns: + None + """ + # --- 执行逻辑 --- + # 1. 将 cpu_block_id 追加到 free_cpu_block_ids + # 2. 从 block_mapping 中删除对应的映射条目 + pass + + def swap_out( + self, + gpu_k_cache: torch.Tensor, + gpu_v_cache: torch.Tensor, + metadata: OffloadMetadata, + ) -> bool: + """将指定层的 KV Cache 从 GPU 卸载到 CPU(swap out)。 + + Args: + gpu_k_cache: GPU 端该层的 K cache,shape = [num_blocks, block_size, num_kv_heads, head_dim] + gpu_v_cache: GPU 端该层的 V cache,shape 同上 + metadata: 描述本次卸载的元数据(包含 seq_id, layer_name, block_ids 等) + + Returns: + bool: 卸载是否成功(CPU 空间不足时返回 False) + """ + # --- 执行逻辑 --- + # 1. 解析 metadata 获取 layer_index 和 block_ids + # 2. 对每个 gpu_block_id in block_ids: + # a. 调用 allocate_cpu_block() 获取 cpu_block_id + # b. 若分配失败,回滚已分配的 block 并返回 False + # c. 用非阻塞拷贝将 gpu_k_cache[gpu_block_id] → cpu_k_cache[layer][cpu_block_id] + # d. 同理拷贝 V cache + # e. 记录 block_mapping[(seq_id, layer, gpu_block_id)] = cpu_block_id + # 3. 同步 CUDA stream 确保拷贝完成 + # 4. 返回 True + return False + + def swap_in( + self, + gpu_k_cache: torch.Tensor, + gpu_v_cache: torch.Tensor, + metadata: OffloadMetadata, + ) -> bool: + """将指定层的 KV Cache 从 CPU 加载回 GPU(swap in)。 + + Args: + gpu_k_cache: GPU 端该层的 K cache(目标写入位置) + gpu_v_cache: GPU 端该层的 V cache(目标写入位置) + metadata: 描述本次加载的元数据 + + Returns: + bool: 加载是否成功(找不到对应 CPU block 时返回 False) + """ + # --- 执行逻辑 --- + # 1. 解析 metadata 获取 layer_index, seq_id, block_ids + # 2. 对每个 gpu_block_id in block_ids: + # a. 通过 block_mapping 查找对应的 cpu_block_id + # b. 若找不到,说明数据不在 CPU 上,返回 False + # c. 用非阻塞拷贝将 cpu_k_cache[layer][cpu_block_id] → gpu_k_cache[gpu_block_id] + # d. 同理拷贝 V cache + # e. 释放已加载的 cpu_block(调用 free_cpu_block) + # 3. 同步 CUDA stream 确保拷贝完成 + # 4. 返回 True + return False + + def has_cached(self, seq_id: int, layer_name: str, block_id: int) -> bool: + """查询某个 block 是否已被缓存在 CPU 端。 + + Args: + seq_id: 序列 ID + layer_name: 层名称 + block_id: GPU 端 block ID + + Returns: + bool: 是否存在于 CPU cache 中 + """ + # --- 执行逻辑 --- + # 检查 (seq_id, layer_name, block_id) 是否在 block_mapping 中 + return False + + def clear_seq(self, seq_id: int) -> None: + """清除某个序列在 CPU 端的所有缓存 block。 + + Args: + seq_id: 要清除的序列 ID + + Returns: + None + """ + # --- 执行逻辑 --- + # 1. 遍历 block_mapping,找出所有 key 中 seq_id 匹配的条目 + # 2. 对每个匹配条目调用 free_cpu_block 释放 + # 3. 从 block_mapping 中删除这些条目 + pass diff --git a/nanovllm/distributed/cpu_offload_manager/metadata.py b/nanovllm/distributed/cpu_offload_manager/metadata.py new file mode 100644 index 000000000..eabd7fe32 --- /dev/null +++ b/nanovllm/distributed/cpu_offload_manager/metadata.py @@ -0,0 +1,71 @@ +"""CPU Offload 元数据定义模块。 + +定义 KV Cache 在 GPU 与 CPU 之间卸载时所需的元数据结构。 +对应 vllm-ascend PR #1659 中的 metadata.py。 +""" + +from dataclasses import dataclass, field + + +@dataclass +class OffloadMetadata: + """单次 KV Cache 卸载/加载操作的元数据。 + + Attributes: + seq_id: 请求序列的唯一标识 + layer_name: 模型层名称,如 "layers.0.self_attn" + block_ids: 需要卸载/加载的物理 block ID 列表 + token_length: 该序列当前的 token 总长度 + is_prefill: 当前是否处于 prefill 阶段 + """ + seq_id: int = -1 + layer_name: str = "" + block_ids: list[int] = field(default_factory=list) + token_length: int = 0 + is_prefill: bool = False + + +@dataclass +class SwapRequest: + """描述一次 GPU ↔ CPU 交换请求。 + + Attributes: + metadata: 本次交换对应的元数据 + direction: 交换方向,"swap_out" 表示 GPU→CPU,"swap_in" 表示 CPU→GPU + priority: 优先级,数值越小优先级越高 + """ + metadata: OffloadMetadata = field(default_factory=OffloadMetadata) + direction: str = "swap_out" + priority: int = 0 + + +def build_offload_metadata( + seq_id: int, + layer_name: str, + block_ids: list[int], + token_length: int, + is_prefill: bool, +) -> OffloadMetadata: + """根据当前推理状态构建卸载元数据。 + + Args: + seq_id: 请求序列 ID + layer_name: 模型层名称 + block_ids: 涉及的物理 block ID 列表 + token_length: 序列当前 token 总长度 + is_prefill: 是否处于 prefill 阶段 + + Returns: + OffloadMetadata: 填充完毕的元数据对象 + """ + # --- 执行逻辑 --- + # 1. 将传入参数直接封装为 OffloadMetadata 数据类 + # 2. 返回该元数据实例,供 Connector 和 Manager 在后续 + # swap_out / swap_in 操作中使用 + return OffloadMetadata( + seq_id=seq_id, + layer_name=layer_name, + block_ids=block_ids, + token_length=token_length, + is_prefill=is_prefill, + ) diff --git a/nanovllm/engine/model_runner.py b/nanovllm/engine/model_runner.py index f66c38efd..c94881c47 100644 --- a/nanovllm/engine/model_runner.py +++ b/nanovllm/engine/model_runner.py @@ -8,6 +8,7 @@ from nanovllm.engine.sequence import Sequence from nanovllm.models.qwen3 import Qwen3ForCausalLM from nanovllm.layers.sampler import Sampler +from nanovllm.layers.attention_utils import set_connector from nanovllm.utils.context import set_context, get_context, reset_context from nanovllm.utils.loader import load_model @@ -33,6 +34,7 @@ def __init__(self, config: Config, rank: int, event: Event | list[Event]): self.sampler = Sampler() self.warmup_model() self.allocate_kv_cache() + self._maybe_init_cpu_offload_connector() if not self.enforce_eager: self.capture_cudagraph() torch.set_default_device("cpu") @@ -117,6 +119,24 @@ def allocate_kv_cache(self): module.v_cache = self.kv_cache[1, layer_id] layer_id += 1 + def _maybe_init_cpu_offload_connector(self) -> None: + """根据配置决定是否初始化 CPU Offload Connector。 + + Returns: + None + """ + # --- 执行逻辑 --- + # 1. 检查 config.kv_transfer_config 是否为 None,若是则跳过 + # 2. 若配置了 kv_connector == "CPUOffloadConnector": + # a. 从 config 和 hf_config 中提取 num_layers, block_size, + # num_kv_heads, head_dim, dtype 等参数 + # b. 创建 CPUOffloadConnector 实例 + # c. 调用 connector.initialize() 分配 CPU 内存池并启动 metadata server + # d. 调用 set_connector(connector) 设置全局引用, + # 使 Attention 层的 wait_for_kv_layer / maybe_save_kv_layer 能访问到 + # 3. 若不匹配已知 connector 类型,忽略 + pass + def prepare_block_tables(self, seqs: list[Sequence]): max_len = max(len(seq.block_table) for seq in seqs) block_tables = [seq.block_table + [-1] * (max_len - len(seq.block_table)) for seq in seqs] diff --git a/nanovllm/layers/attention.py b/nanovllm/layers/attention.py index e416139ea..f312e8b5a 100644 --- a/nanovllm/layers/attention.py +++ b/nanovllm/layers/attention.py @@ -5,6 +5,10 @@ from flash_attn import flash_attn_varlen_func, flash_attn_with_kvcache from nanovllm.utils.context import get_context +from nanovllm.layers.attention_utils import ( + wait_for_kv_layer_from_connector, + maybe_save_kv_layer_to_connector, +) @triton.jit @@ -48,17 +52,23 @@ def __init__( head_dim, scale, num_kv_heads, + layer_name: str = "", ): super().__init__() self.num_heads = num_heads self.head_dim = head_dim self.scale = scale self.num_kv_heads = num_kv_heads + self.layer_name = layer_name self.k_cache = self.v_cache = torch.tensor([]) def forward(self, q: torch.Tensor, k: torch.Tensor, v: torch.Tensor): context = get_context() k_cache, v_cache = self.k_cache, self.v_cache + + # --- CPU Offload: prefill 前尝试从 CPU 换入该层 KV Cache --- + wait_for_kv_layer_from_connector(self.layer_name, k_cache, v_cache) + if k_cache.numel() and v_cache.numel(): store_kvcache(k, v, k_cache, v_cache, context.slot_mapping) if context.is_prefill: @@ -68,8 +78,11 @@ def forward(self, q: torch.Tensor, k: torch.Tensor, v: torch.Tensor): max_seqlen_q=context.max_seqlen_q, cu_seqlens_q=context.cu_seqlens_q, max_seqlen_k=context.max_seqlen_k, cu_seqlens_k=context.cu_seqlens_k, softmax_scale=self.scale, causal=True, block_table=context.block_tables) + + # --- CPU Offload: prefill 后尝试将该层 KV Cache 卸载到 CPU --- + maybe_save_kv_layer_to_connector(self.layer_name, k_cache, v_cache) else: # decode o = flash_attn_with_kvcache(q.unsqueeze(1), k_cache, v_cache, - cache_seqlens=context.context_lens, block_table=context.block_tables, + cache_seqlens=context.context_lens, block_table=context.block_tables, softmax_scale=self.scale, causal=True) return o diff --git a/nanovllm/layers/attention_utils.py b/nanovllm/layers/attention_utils.py new file mode 100644 index 000000000..69b71ab98 --- /dev/null +++ b/nanovllm/layers/attention_utils.py @@ -0,0 +1,88 @@ +"""Attention 层与 CPU Offload Connector 的集成工具函数。 + +提供在 Attention forward 前后调用的钩子函数,用于: +- prefill 前:从 CPU 换入(swap in)该层 KV Cache +- prefill 后:将该层 KV Cache 卸载(swap out)到 CPU + +对应 vllm-ascend PR #1659 中从 attention_v1.py 提取到 utils.py 的两个函数。 +""" + +import torch + +# 全局 connector 引用,由 ModelRunner 初始化时设置 +_CONNECTOR = None + + +def set_connector(connector) -> None: + """设置全局 CPU Offload Connector 实例。 + + Args: + connector: CPUOffloadConnector 实例,或 None 表示不启用 offload + """ + global _CONNECTOR + _CONNECTOR = connector + + +def get_connector(): + """获取全局 CPU Offload Connector 实例。 + + Returns: + CPUOffloadConnector | None + """ + return _CONNECTOR + + +def wait_for_kv_layer_from_connector( + layer_name: str, + k_cache: torch.Tensor, + v_cache: torch.Tensor, +) -> None: + """在 Attention forward 之前调用,等待从 CPU 换入该层的 KV Cache。 + + 若未启用 CPU Offload(connector 为 None),则直接返回不做任何操作。 + + Args: + layer_name: 当前层名称,如 "layers.0.self_attn" + k_cache: GPU 端该层的 K cache 张量 + v_cache: GPU 端该层的 V cache 张量 + + Returns: + None(原地写入 k_cache / v_cache) + """ + # --- 执行逻辑 --- + # 1. 检查全局 _CONNECTOR 是否为 None,若是则直接 return + # 2. 从当前推理上下文(get_context())中获取 seq_id、block_ids 等信息 + # 3. 检查该层对应的 block 是否已被 swap out 到 CPU + # 4. 若是,调用 connector.recv_kv_caches_and_hidden_states( + # layer_name, seq_id, k_cache, v_cache, block_ids, token_length, is_prefill) + # 将 CPU 上缓存的 KV 数据拷贝回 GPU 端的 k_cache / v_cache + # 5. 等待异步拷贝完成(CUDA stream 同步) + pass + + +def maybe_save_kv_layer_to_connector( + layer_name: str, + k_cache: torch.Tensor, + v_cache: torch.Tensor, +) -> None: + """在 prefill Attention forward 之后调用,尝试将该层 KV Cache 卸载到 CPU。 + + 仅在 prefill 阶段且启用了 CPU Offload 时生效。 + + Args: + layer_name: 当前层名称 + k_cache: GPU 端该层的 K cache 张量 + v_cache: GPU 端该层的 V cache 张量 + + Returns: + None + """ + # --- 执行逻辑 --- + # 1. 检查全局 _CONNECTOR 是否为 None,若是则直接 return + # 2. 从当前推理上下文中获取 seq_id、block_ids、token_length、is_prefill + # 3. 仅在 is_prefill == True 时执行卸载 + # 4. 调用 connector.send_kv_caches_and_hidden_states( + # layer_name, seq_id, k_cache, v_cache, block_ids, token_length, is_prefill) + # 通过异步拷贝将 GPU 上的 KV Cache 数据写入 CPU pin_memory 区域 + # 5. 无需等待完成——异步执行以减少对推理延迟的影响 + pass diff --git a/nanovllm/models/qwen3.py b/nanovllm/models/qwen3.py index 5d39e0b90..84480a964 100755 --- a/nanovllm/models/qwen3.py +++ b/nanovllm/models/qwen3.py @@ -24,6 +24,7 @@ def __init__( qkv_bias: bool = False, rope_theta: float = 10000, rope_scaling: tuple | None = None, + layer_name: str = "", ) -> None: super().__init__() tp_size = dist.get_world_size() @@ -63,6 +64,7 @@ def __init__( self.head_dim, self.scaling, self.num_kv_heads, + layer_name=layer_name, ) if not self.qkv_bias: self.q_norm = RMSNorm(self.head_dim, eps=rms_norm_eps) @@ -121,6 +123,7 @@ class Qwen3DecoderLayer(nn.Module): def __init__( self, config: Qwen3Config, + layer_idx: int = 0, ) -> None: super().__init__() self.self_attn = Qwen3Attention( @@ -133,6 +136,7 @@ def __init__( head_dim=getattr(config, 'head_dim', None), rope_theta=getattr(config, "rope_theta", 1000000), rope_scaling=getattr(config, "rope_scaling", None), + layer_name=f"layers.{layer_idx}.self_attn", ) self.mlp = Qwen3MLP( hidden_size=config.hidden_size, @@ -166,7 +170,7 @@ def __init__( ) -> None: super().__init__() self.embed_tokens = VocabParallelEmbedding(config.vocab_size, config.hidden_size) - self.layers = nn.ModuleList([Qwen3DecoderLayer(config) for _ in range(config.num_hidden_layers)]) + self.layers = nn.ModuleList([Qwen3DecoderLayer(config, layer_idx=i) for i in range(config.num_hidden_layers)]) self.norm = RMSNorm(config.hidden_size, eps=config.rms_norm_eps) def forward( From 156d560572b13d09ee389a51c9dd0e56bcf351e2 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 12 Apr 2026 15:47:39 +0000 Subject: [PATCH 2/3] feat: add Ascend 910C scatter+burst transfer and MLA attention stubs Ascend DaVinci architecture optimizations: - npu_ops.py: scatter/gather for packing small KV blocks into contiguous buffers, burst DMA for single-transaction transfer, fused npu_kv_rmsnorm_rope_cache operator - hccl_utils.py: HCCL communication stubs (AllGather, ReduceScatter) - CPUOffloadConnector: ascend_swap_out_with_scatter_burst and ascend_swap_in_with_burst_gather methods - CPUKVCacheManager: store_packed_blocks / load_packed_blocks - attention_utils.py: auto-select Ascend burst path via flag MLA (Multi-head Latent Attention) restoration from PR #1659: - mla_attention.py: MLAAttention with compressed KV cache (kv_c + k_pe), q/kv projection stubs, CPU offload hooks - deepseek_v2.py: DeepSeekV2ForCausalLM with MLA + MoE stubs, layer_name propagation for per-layer offload https://claude.ai/code/session_01WTTVHqXWMtHAQt5picfcig --- nanovllm/distributed/cpu_offload_connector.py | 116 +++++++- .../cpu_kv_cache_manager.py | 74 ++++++ nanovllm/layers/ascend/__init__.py | 12 + nanovllm/layers/ascend/hccl_utils.py | 88 +++++++ nanovllm/layers/ascend/npu_ops.py | 184 +++++++++++++ nanovllm/layers/attention_utils.py | 72 +++-- nanovllm/layers/mla_attention.py | 190 +++++++++++++ nanovllm/models/deepseek_v2.py | 249 ++++++++++++++++++ 8 files changed, 962 insertions(+), 23 deletions(-) create mode 100644 nanovllm/layers/ascend/__init__.py create mode 100644 nanovllm/layers/ascend/hccl_utils.py create mode 100644 nanovllm/layers/ascend/npu_ops.py create mode 100644 nanovllm/layers/mla_attention.py create mode 100644 nanovllm/models/deepseek_v2.py diff --git a/nanovllm/distributed/cpu_offload_connector.py b/nanovllm/distributed/cpu_offload_connector.py index 919cb336c..a49af0f47 100644 --- a/nanovllm/distributed/cpu_offload_connector.py +++ b/nanovllm/distributed/cpu_offload_connector.py @@ -1,9 +1,14 @@ """CPU Offload Connector 主模块。 -实现 GPU KV Cache 到 CPU(Host DRAM)的卸载与加载连接器。 +实现 GPU/NPU KV Cache 到 CPU(Host DRAM)的卸载与加载连接器。 在 Connector 内部启动 metadata server,管理 CPU KV Cache 并提供 RPC 调用接口,支持多个 data-parallel EngineCore 实例之间共享 KV Cache。 对应 vllm-ascend PR #1659 中的 cpu_offload_connector.py。 + +昇腾 910C 适配: + 针对达芬奇架构分离式芯片特点,提供 scatter+burst 传输路径—— + 先将分散的小 block 打包为连续 buffer(scatter), + 再以 burst 模式做 DMA 传输,降低调度和编译开销。 """ import torch @@ -149,6 +154,115 @@ def should_swap_in(self, num_free_gpu_blocks: int) -> bool: # 否则返回 False return False + # ================================================================ + # 昇腾 910C 专用:scatter + burst 传输路径 + # ================================================================ + + def ascend_swap_out_with_scatter_burst( + self, + layer_name: str, + seq_id: int, + npu_k_cache: torch.Tensor, + npu_v_cache: torch.Tensor, + block_ids: list[int], + token_length: int, + is_prefill: bool, + ) -> bool: + """昇腾 910C 优化的 swap out:scatter 打包 + burst DMA 传输。 + + 标准路径(逐 block 拷贝)在达芬奇架构上的问题: + - 每个小 block 需要一次独立的 DMA 事务 + - 每次 DMA 事务有 ~5μs 的 AI CPU 调度开销 + - N 个 block → N 次调度 → 调度总开销 = N × 5μs + - 编译器需为每个 block 生成独立的 DMA 指令 + + 本方法的优化策略: + 1. scatter: 在 NPU 的 AI Core 上将 N 个分散 block 打包到连续 buffer + → 仅 1 次算子下发(AI Core 内部并行执行,无 AI CPU 调度) + 2. burst: 对连续 buffer 做 1 次 DMA burst 传输 + → 仅 1 次 DMA 调度,充分利用 HBM→DDR 带宽 + 总调度开销:~5μs(1 次 scatter 算子 + 1 次 DMA),而非 N × 5μs + + Args: + layer_name: 模型层名称 + seq_id: 序列 ID + npu_k_cache: NPU 端该层 K cache + npu_v_cache: NPU 端该层 V cache + block_ids: 需要卸载的 block ID 列表(在 NPU 显存中可能不连续) + token_length: 序列当前 token 长度 + is_prefill: 是否 prefill 阶段 + + Returns: + bool: 卸载是否成功 + """ + # --- 执行逻辑 --- + # 1. 调用 npu_scatter_kv_blocks(npu_k_cache, npu_v_cache, block_ids, block_size) + # 在 AI Core 上将分散的 block 打包到连续的 k_packed, v_packed + # + # 2. 在 CPU 端分配对应大小的 pin_memory buffer: + # cpu_k_buf = torch.empty_like(k_packed, device="cpu", pin_memory=True) + # cpu_v_buf = torch.empty_like(v_packed, device="cpu", pin_memory=True) + # + # 3. 调用 npu_burst_copy_to_cpu(k_packed, cpu_k_buf) 做 burst DMA + # 调用 npu_burst_copy_to_cpu(v_packed, cpu_v_buf) + # + # 4. 将 cpu_k_buf / cpu_v_buf 写入 manager 的 CPU 存储池 + # (按 block 粒度切分后存入 manager.cpu_k_cache / cpu_v_cache) + # + # 5. 记录 metadata 映射关系 + # 6. 返回 True + return False + + def ascend_swap_in_with_burst_gather( + self, + layer_name: str, + seq_id: int, + npu_k_cache: torch.Tensor, + npu_v_cache: torch.Tensor, + block_ids: list[int], + token_length: int, + is_prefill: bool, + ) -> bool: + """昇腾 910C 优化的 swap in:burst DMA 传输 + gather 分发。 + + scatter+burst 的逆操作: + 1. 从 CPU 端拼装连续 buffer + 2. burst DMA 一次性传输到 NPU + 3. gather 分发到各 block 位置 + + Args: + layer_name: 模型层名称 + seq_id: 序列 ID + npu_k_cache: NPU 端该层 K cache(写入目标) + npu_v_cache: NPU 端该层 V cache(写入目标) + block_ids: 需要加载的 block ID 列表 + token_length: 序列当前 token 长度 + is_prefill: 是否 prefill 阶段 + + Returns: + bool: 加载是否成功 + """ + # --- 执行逻辑 --- + # 1. 从 manager 的 CPU 存储池中读取对应 block 数据, + # 拼装为连续的 cpu_k_buf / cpu_v_buf + # + # 2. 在 NPU 端分配对应大小的连续接收 buffer: + # npu_k_buf = torch.empty_like(cpu_k_buf, device="npu") + # npu_v_buf = torch.empty_like(cpu_v_buf, device="npu") + # + # 3. 调用 npu_burst_copy_to_npu(cpu_k_buf, npu_k_buf) 做 burst DMA + # 调用 npu_burst_copy_to_npu(cpu_v_buf, npu_v_buf) + # + # 4. 等待 DMA 传输完成(stream 同步,确保数据到达 NPU) + # + # 5. 调用 npu_gather_kv_blocks(npu_k_buf, npu_v_buf, + # npu_k_cache, npu_v_cache, block_ids, block_size) + # 在 AI Core 上将连续 buffer 分发到各 block 位置 + # + # 6. 释放 CPU 端对应 block,更新 metadata + # 7. 返回 True + return False + def close(self) -> None: """关闭连接器,释放 CPU 资源并停止 metadata server。 diff --git a/nanovllm/distributed/cpu_offload_manager/cpu_kv_cache_manager.py b/nanovllm/distributed/cpu_offload_manager/cpu_kv_cache_manager.py index fbc3be205..742811ecf 100644 --- a/nanovllm/distributed/cpu_offload_manager/cpu_kv_cache_manager.py +++ b/nanovllm/distributed/cpu_offload_manager/cpu_kv_cache_manager.py @@ -174,3 +174,77 @@ def clear_seq(self, seq_id: int) -> None: # 2. 对每个匹配条目调用 free_cpu_block 释放 # 3. 从 block_mapping 中删除这些条目 pass + + # ================================================================ + # 昇腾 910C 专用:适配 scatter+burst 传输模式 + # ================================================================ + + def store_packed_blocks( + self, + layer_name: str, + seq_id: int, + k_packed: torch.Tensor, + v_packed: torch.Tensor, + block_ids: list[int], + ) -> bool: + """将 scatter 打包后的连续 KV 数据按 block 粒度存入 CPU 存储池。 + + 由 CPUOffloadConnector.ascend_swap_out_with_scatter_burst 在 + burst DMA 传输完成后调用。此时 k_packed/v_packed 已在 CPU pin_memory 上。 + + Args: + layer_name: 层名称 + seq_id: 序列 ID + k_packed: 打包后的连续 K 数据(CPU 端), + shape = [len(block_ids) * block_size, num_kv_heads, head_dim] + v_packed: 打包后的连续 V 数据(CPU 端),shape 同上 + block_ids: 对应的 NPU 端 block ID 列表 + + Returns: + bool: 存储是否成功 + """ + # --- 执行逻辑 --- + # 1. 将 k_packed / v_packed 按 block_size 切分为 len(block_ids) 段 + # 2. 对每段: + # a. 调用 allocate_cpu_block() 获取 cpu_block_id + # b. 若分配失败,回滚并返回 False + # c. 将该段直接拷贝(CPU→CPU memcpy,零拷贝或 pin_memory 内拷贝) + # 到 cpu_k_cache[layer_idx][cpu_block_id] 和 cpu_v_cache 对应位置 + # d. 记录 block_mapping[(seq_id, layer_name, block_ids[i])] = cpu_block_id + # 3. 返回 True + return False + + def load_packed_blocks( + self, + layer_name: str, + seq_id: int, + block_ids: list[int], + ) -> tuple[torch.Tensor, torch.Tensor] | None: + """将 CPU 存储池中的多个 block 拼装为连续 buffer,供 burst DMA 传输回 NPU。 + + 由 CPUOffloadConnector.ascend_swap_in_with_burst_gather 在 + burst DMA 传输前调用。返回的连续 buffer 可直接用于 npu_burst_copy_to_npu。 + + Args: + layer_name: 层名称 + seq_id: 序列 ID + block_ids: 需要加载的 NPU 端 block ID 列表 + + Returns: + (k_packed, v_packed): 拼装好的连续 CPU 张量 + 若任意 block 不在 CPU 缓存中,返回 None + """ + # --- 执行逻辑 --- + # 1. 对每个 block_id in block_ids: + # a. 通过 block_mapping 查找对应的 cpu_block_id + # b. 若找不到,返回 None + # 2. 分配连续 CPU buffer: + # k_packed = torch.empty([len(block_ids) * block_size, num_kv_heads, head_dim], + # pin_memory=True) + # v_packed 同上 + # 3. 对每个 block: + # 将 cpu_k_cache[layer_idx][cpu_block_id] 拷贝到 k_packed 对应段 + # 将 cpu_v_cache[layer_idx][cpu_block_id] 拷贝到 v_packed 对应段 + # 4. 释放已加载的 cpu_block(调用 free_cpu_block) + # 5. 返回 (k_packed, v_packed) + return None diff --git a/nanovllm/layers/ascend/__init__.py b/nanovllm/layers/ascend/__init__.py new file mode 100644 index 000000000..e97d46d28 --- /dev/null +++ b/nanovllm/layers/ascend/__init__.py @@ -0,0 +1,12 @@ +from nanovllm.layers.ascend.npu_ops import ( + npu_scatter_kv_blocks, + npu_gather_kv_blocks, + npu_burst_copy_to_cpu, + npu_burst_copy_to_npu, + npu_kv_rmsnorm_rope_cache, +) +from nanovllm.layers.ascend.hccl_utils import ( + hccl_init_comm_group, + hccl_all_gather, + hccl_reduce_scatter, +) diff --git a/nanovllm/layers/ascend/hccl_utils.py b/nanovllm/layers/ascend/hccl_utils.py new file mode 100644 index 000000000..cfa908ee7 --- /dev/null +++ b/nanovllm/layers/ascend/hccl_utils.py @@ -0,0 +1,88 @@ +"""昇腾 HCCL (Huawei Collective Communication Library) 工具函数。 + +HCCL 是昇腾 NPU 上替代 NCCL 的集合通信库, +用于多卡/多机之间的张量通信(AllGather, ReduceScatter 等)。 +在 MLA + DP 并行场景下,用于跨 EngineCore 实例同步 KV Cache 元数据。 +""" + +import torch + + +def hccl_init_comm_group( + world_size: int, + rank: int, + group_name: str = "default", +) -> object: + """初始化 HCCL 通信组。 + + Args: + world_size: 通信组中的总进程数 + rank: 当前进程在通信组中的序号 + group_name: 通信组名称,用于区分不同用途的通信组 + 如 "dp_group"(数据并行)、"kv_transfer_group"(KV 传输) + + Returns: + comm_group: HCCL 通信组句柄,供后续通信操作使用 + """ + # --- 执行逻辑 --- + # 1. 调用 torch_npu.distributed 接口初始化进程组: + # torch.distributed.init_process_group( + # backend="hccl", world_size=world_size, rank=rank) + # 2. 创建命名子组: + # group = torch.distributed.new_group(ranks=list(range(world_size)), backend="hccl") + # 3. 验证 HCCL 链路连通性(ping-pong 测试) + # 4. 返回通信组句柄 + return None + + +def hccl_all_gather( + input_tensor: torch.Tensor, + comm_group: object, + world_size: int, +) -> torch.Tensor: + """通过 HCCL 执行 AllGather 操作。 + + 在 MLA 中,fused_qkv_a_proj 的输出 (q_c, kv_no_split) + 可能需要跨 TP rank 做 AllGather 以获取完整的隐向量。 + + Args: + input_tensor: 当前 rank 的局部张量,shape = [num_tokens, local_dim] + comm_group: HCCL 通信组句柄 + world_size: 通信组大小 + + Returns: + gathered: AllGather 后的完整张量,shape = [num_tokens, local_dim * world_size] + """ + # --- 执行逻辑 --- + # 1. 分配输出 buffer:shape = [num_tokens, local_dim * world_size] + # 2. 调用 HCCL AllGather: + # torch.distributed.all_gather_into_tensor( + # output, input_tensor, group=comm_group) + # 3. 返回 gathered 张量 + return input_tensor + + +def hccl_reduce_scatter( + input_tensor: torch.Tensor, + comm_group: object, + world_size: int, +) -> torch.Tensor: + """通过 HCCL 执行 ReduceScatter 操作。 + + MLA 的 output projection 后需要跨 TP rank 做 ReduceScatter。 + + Args: + input_tensor: 当前 rank 的局部结果,shape = [num_tokens, total_dim] + comm_group: HCCL 通信组句柄 + world_size: 通信组大小 + + Returns: + reduced: ReduceScatter 后的张量,shape = [num_tokens, total_dim // world_size] + """ + # --- 执行逻辑 --- + # 1. 分配输出 buffer:shape = [num_tokens, total_dim // world_size] + # 2. 调用 HCCL ReduceScatter(求和归约): + # torch.distributed.reduce_scatter_tensor( + # output, input_tensor, op=ReduceOp.SUM, group=comm_group) + # 3. 返回 reduced 张量 + return input_tensor diff --git a/nanovllm/layers/ascend/npu_ops.py b/nanovllm/layers/ascend/npu_ops.py new file mode 100644 index 000000000..7819578d4 --- /dev/null +++ b/nanovllm/layers/ascend/npu_ops.py @@ -0,0 +1,184 @@ +"""昇腾 910C NPU 算子桩函数。 + +针对达芬奇(DaVinci)架构的分离式芯片特点: +- AI Core(向量/矩阵计算单元)与 AI CPU(调度单元)分离 +- 内存搬运通过 DMA 引擎完成,每次 DMA 事务有固定调度开销 +- 小 block 分散传输会产生大量 DMA 事务,调度和编译开销高 + +核心优化策略: + scatter → 将分散的小 KV block 打包到连续 buffer + burst → 以连续大块进行 DMA 传输,降低事务数和调度开销 +""" + +import torch + + +def npu_scatter_kv_blocks( + k_cache: torch.Tensor, + v_cache: torch.Tensor, + block_ids: list[int], + block_size: int, +) -> tuple[torch.Tensor, torch.Tensor]: + """将分散在 NPU 显存中的多个小 KV block 打包(scatter)到连续 buffer。 + + 达芬奇架构下,每次 DMA 事务有固定的调度开销(约数微秒)。 + 若逐 block 传输(如 16 个 256-token block),会产生 16 次 DMA 事务。 + 先 scatter 到连续 buffer 再做一次 burst 传输,事务数降为 1。 + + Args: + k_cache: NPU 端整层 K cache,shape = [num_blocks, block_size, num_kv_heads, head_dim] + v_cache: NPU 端整层 V cache,shape 同上 + block_ids: 需要打包的物理 block ID 列表(可能不连续) + block_size: 每个 block 的 token 数 + + Returns: + k_packed: 连续打包后的 K 数据,shape = [len(block_ids) * block_size, num_kv_heads, head_dim] + v_packed: 连续打包后的 V 数据,shape 同上 + """ + # --- 执行逻辑 --- + # 1. 计算 packed buffer 总大小 = len(block_ids) * block_size * num_kv_heads * head_dim + # 2. 在 NPU 上分配连续的 k_packed, v_packed 缓冲区 + # 3. 利用达芬奇 Vector Core 的 scatter 指令(或 torch_npu.npu_scatter): + # 对每个 block_id,将 k_cache[block_id] 和 v_cache[block_id] + # 拷贝到 packed buffer 的对应偏移位置 + # 4. 关键:scatter 操作在 AI Core 上完成,无需 AI CPU 调度介入, + # 因此多 block 打包只有一次算子下发开销 + # 5. 返回 (k_packed, v_packed) + pass + + +def npu_gather_kv_blocks( + k_packed: torch.Tensor, + v_packed: torch.Tensor, + k_cache: torch.Tensor, + v_cache: torch.Tensor, + block_ids: list[int], + block_size: int, +) -> None: + """将连续 buffer 中的 KV 数据分发(gather/scatter-back)到 NPU cache 的各 block 位置。 + + npu_scatter_kv_blocks 的逆操作:CPU→NPU burst 传输完成后, + 将连续 buffer 拆分写回各 block slot。 + + Args: + k_packed: 从 CPU burst 传输回来的连续 K 数据 + v_packed: 从 CPU burst 传输回来的连续 V 数据 + k_cache: NPU 端整层 K cache(写入目标) + v_cache: NPU 端整层 V cache(写入目标) + block_ids: 目标物理 block ID 列表 + block_size: 每个 block 的 token 数 + + Returns: + None(原地写入 k_cache / v_cache 的指定 block 位置) + """ + # --- 执行逻辑 --- + # 1. 对 k_packed / v_packed 按 block_size 切分为 len(block_ids) 段 + # 2. 利用达芬奇 Vector Core 的 scatter 写指令: + # 将每段写入 k_cache[block_ids[i]] 和 v_cache[block_ids[i]] + # 3. 同 scatter 打包一样,仅一次算子下发,避免多次 AI CPU 调度 + pass + + +def npu_burst_copy_to_cpu( + npu_tensor: torch.Tensor, + cpu_tensor: torch.Tensor, +) -> None: + """通过 DMA burst 模式将 NPU 连续内存块拷贝到 CPU pin_memory。 + + burst 模式下 DMA 引擎一次性搬运整块连续数据,相比逐小块拷贝: + - 只需 1 次 DMA 事务调度(而非 N 次) + - 编译器只需生成 1 条 DMA 指令(减少编译开销) + - 充分利用 910C 的 HBM→DDR 带宽 + + Args: + npu_tensor: NPU 端连续张量(源),由 scatter 打包得到 + cpu_tensor: CPU 端 pin_memory 张量(目标),形状与 npu_tensor 一致 + + Returns: + None(异步传输,调用后数据可能尚未到达 CPU) + """ + # --- 执行逻辑 --- + # 1. 断言 npu_tensor 是连续的(is_contiguous) + # 2. 断言 cpu_tensor 使用 pin_memory 分配 + # 3. 调用 torch_npu 的异步 DMA 拷贝接口: + # torch.npu.current_stream().memcpy_async(cpu_tensor, npu_tensor) + # 底层映射到达芬奇 DMA Engine 的 burst 传输模式 + # 4. 不做 stream 同步——交由上层决定何时 sync + pass + + +def npu_burst_copy_to_npu( + cpu_tensor: torch.Tensor, + npu_tensor: torch.Tensor, +) -> None: + """通过 DMA burst 模式将 CPU pin_memory 拷贝到 NPU 连续内存。 + + 与 npu_burst_copy_to_cpu 对称,用于 swap in(CPU→NPU)场景。 + + Args: + cpu_tensor: CPU 端 pin_memory 张量(源) + npu_tensor: NPU 端连续张量(目标) + + Returns: + None(异步传输) + """ + # --- 执行逻辑 --- + # 1. 断言 cpu_tensor 是 pin_memory + # 2. 断言 npu_tensor 是连续的 + # 3. 调用 torch_npu 异步 DMA: + # torch.npu.current_stream().memcpy_async(npu_tensor, cpu_tensor) + # 4. 不做 stream 同步 + pass + + +def npu_kv_rmsnorm_rope_cache( + kv_no_split: torch.Tensor, + cos: torch.Tensor, + sin: torch.Tensor, + kv_cache: list[torch.Tensor], + slots: torch.Tensor, + kv_lora_rank: int, + qk_rope_head_dim: int, + eps: float = 1e-6, +) -> tuple[torch.Tensor, torch.Tensor]: + """达芬奇架构 MLA 专用融合算子:RMSNorm + RoPE + KV Cache 写入一步完成。 + + 在 MLA 中,每一步需要对 kv_c(压缩 KV 隐向量)做 RMSNorm, + 对 k_pe(位置编码部分)做 RoPE,然后写入 KV Cache。 + 在 910C 的 AI Core 上将这三步融合为一个算子,避免中间张量的 + HBM 读写和多次算子下发。 + + Args: + kv_no_split: 压缩后的 KV 拼接向量,shape = [num_tokens, kv_lora_rank + qk_rope_head_dim] + 前 kv_lora_rank 维是 kv_c,后 qk_rope_head_dim 维是 k_pe + cos: RoPE 余弦,shape = [num_tokens, qk_rope_head_dim] + sin: RoPE 正弦,shape = [num_tokens, qk_rope_head_dim] + kv_cache: 两个张量的列表 [kv_c_cache, k_pe_cache] + kv_c_cache: shape = [num_blocks, block_size, kv_lora_rank] + k_pe_cache: shape = [num_blocks, block_size, qk_rope_head_dim] + slots: 每个 token 的 cache slot 位置,shape = [num_tokens] + kv_lora_rank: KV 低秩压缩的秩 + qk_rope_head_dim: RoPE 位置编码的维度 + eps: RMSNorm 的 epsilon + + Returns: + kv_c_normed: RMSNorm 归一化后的 kv_c,shape = [num_tokens, kv_lora_rank] + k_pe_roped: RoPE 旋转后的 k_pe,shape = [num_tokens, qk_rope_head_dim] + """ + # --- 执行逻辑 --- + # 1. 将 kv_no_split 沿最后一维拆分为 kv_c 和 k_pe: + # kv_c = kv_no_split[:, :kv_lora_rank] + # k_pe = kv_no_split[:, kv_lora_rank:] + # 2. 对 kv_c 执行 RMSNorm: + # kv_c_normed = kv_c * rsqrt(mean(kv_c^2) + eps) + # 3. 对 k_pe 执行 RoPE: + # k_pe_roped = k_pe * cos + rotate_half(k_pe) * sin + # 4. 将 kv_c_normed 写入 kv_cache[0] 对应 slot 位置 + # 将 k_pe_roped 写入 kv_cache[1] 对应 slot 位置 + # 5. 以上四步在达芬奇 AI Core 上融合执行: + # - 使用 Cube Unit 做矩阵乘(RMSNorm 的 rsqrt 部分) + # - 使用 Vector Unit 做逐元素 RoPE 旋转 + # - 使用 MTE (Memory Transfer Engine) 做 cache 写入 + # 全程无中间 HBM 读写,只有一次算子下发 + # 6. 返回 (kv_c_normed, k_pe_roped) + pass diff --git a/nanovllm/layers/attention_utils.py b/nanovllm/layers/attention_utils.py index 69b71ab98..2bbb0fb89 100644 --- a/nanovllm/layers/attention_utils.py +++ b/nanovllm/layers/attention_utils.py @@ -4,6 +4,13 @@ - prefill 前:从 CPU 换入(swap in)该层 KV Cache - prefill 后:将该层 KV Cache 卸载(swap out)到 CPU +支持两种 KV Cache 格式: +- 标准 MHA:传入 (k_cache, v_cache) 两个张量 +- MLA 压缩格式:传入 (kv_c_cache, k_pe_cache) 两个张量 +两者接口统一,通过 *kv_cache_tensors 可变参数适配。 + +昇腾 910C 上,检测到 NPU device 时自动选择 scatter+burst 传输路径。 + 对应 vllm-ascend PR #1659 中从 attention_v1.py 提取到 utils.py 的两个函数。 """ @@ -12,15 +19,20 @@ # 全局 connector 引用,由 ModelRunner 初始化时设置 _CONNECTOR = None +# 是否使用昇腾 scatter+burst 路径(由 _maybe_init 时根据 device 类型设置) +_USE_ASCEND_BURST = False + -def set_connector(connector) -> None: +def set_connector(connector, use_ascend_burst: bool = False) -> None: """设置全局 CPU Offload Connector 实例。 Args: - connector: CPUOffloadConnector 实例,或 None 表示不启用 offload + connector: CPUOffloadConnector 实例,或 None 表示不启用 offload + use_ascend_burst: 是否启用昇腾 scatter+burst 传输路径 """ - global _CONNECTOR + global _CONNECTOR, _USE_ASCEND_BURST _CONNECTOR = connector + _USE_ASCEND_BURST = use_ascend_burst def get_connector(): @@ -34,45 +46,55 @@ def get_connector(): def wait_for_kv_layer_from_connector( layer_name: str, - k_cache: torch.Tensor, - v_cache: torch.Tensor, + *kv_cache_tensors: torch.Tensor, ) -> None: """在 Attention forward 之前调用,等待从 CPU 换入该层的 KV Cache。 若未启用 CPU Offload(connector 为 None),则直接返回不做任何操作。 + 支持两种调用方式: + - 标准 MHA:wait_for_kv_layer_from_connector(name, k_cache, v_cache) + - MLA 格式:wait_for_kv_layer_from_connector(name, kv_c_cache, k_pe_cache) + Args: - layer_name: 当前层名称,如 "layers.0.self_attn" - k_cache: GPU 端该层的 K cache 张量 - v_cache: GPU 端该层的 V cache 张量 + layer_name: 当前层名称,如 "layers.0.self_attn" + *kv_cache_tensors: GPU/NPU 端该层的 KV cache 张量(2 个) Returns: - None(原地写入 k_cache / v_cache) + None(原地写入 kv_cache_tensors) """ # --- 执行逻辑 --- # 1. 检查全局 _CONNECTOR 是否为 None,若是则直接 return # 2. 从当前推理上下文(get_context())中获取 seq_id、block_ids 等信息 # 3. 检查该层对应的 block 是否已被 swap out 到 CPU - # 4. 若是,调用 connector.recv_kv_caches_and_hidden_states( - # layer_name, seq_id, k_cache, v_cache, block_ids, token_length, is_prefill) - # 将 CPU 上缓存的 KV 数据拷贝回 GPU 端的 k_cache / v_cache - # 5. 等待异步拷贝完成(CUDA stream 同步) + # 4. 根据 _USE_ASCEND_BURST 选择传输路径: + # a. 昇腾路径:调用 connector.ascend_swap_in_with_burst_gather( + # layer_name, seq_id, kv_cache_tensors[0], kv_cache_tensors[1], + # block_ids, token_length, is_prefill) + # → burst DMA 传输 + AI Core gather 分发 + # b. 通用路径:调用 connector.recv_kv_caches_and_hidden_states( + # layer_name, seq_id, kv_cache_tensors[0], kv_cache_tensors[1], + # block_ids, token_length, is_prefill) + # → 逐 block CUDA memcpy + # 5. 等待传输完成(stream 同步),确保后续算子读到有效数据 pass def maybe_save_kv_layer_to_connector( layer_name: str, - k_cache: torch.Tensor, - v_cache: torch.Tensor, + *kv_cache_tensors: torch.Tensor, ) -> None: """在 prefill Attention forward 之后调用,尝试将该层 KV Cache 卸载到 CPU。 仅在 prefill 阶段且启用了 CPU Offload 时生效。 + 支持两种调用方式: + - 标准 MHA:maybe_save_kv_layer_to_connector(name, k_cache, v_cache) + - MLA 格式:maybe_save_kv_layer_to_connector(name, kv_c_cache, k_pe_cache) + Args: - layer_name: 当前层名称 - k_cache: GPU 端该层的 K cache 张量 - v_cache: GPU 端该层的 V cache 张量 + layer_name: 当前层名称 + *kv_cache_tensors: GPU/NPU 端该层的 KV cache 张量(2 个) Returns: None @@ -81,8 +103,14 @@ def maybe_save_kv_layer_to_connector( # 1. 检查全局 _CONNECTOR 是否为 None,若是则直接 return # 2. 从当前推理上下文中获取 seq_id、block_ids、token_length、is_prefill # 3. 仅在 is_prefill == True 时执行卸载 - # 4. 调用 connector.send_kv_caches_and_hidden_states( - # layer_name, seq_id, k_cache, v_cache, block_ids, token_length, is_prefill) - # 通过异步拷贝将 GPU 上的 KV Cache 数据写入 CPU pin_memory 区域 - # 5. 无需等待完成——异步执行以减少对推理延迟的影响 + # 4. 根据 _USE_ASCEND_BURST 选择传输路径: + # a. 昇腾路径:调用 connector.ascend_swap_out_with_scatter_burst( + # layer_name, seq_id, kv_cache_tensors[0], kv_cache_tensors[1], + # block_ids, token_length, is_prefill) + # → AI Core scatter 打包 + burst DMA 传输 + # → 对 MLA 格式尤其高效:kv_c + k_pe 数据量远小于完整 K+V + # b. 通用路径:调用 connector.send_kv_caches_and_hidden_states( + # layer_name, seq_id, kv_cache_tensors[0], kv_cache_tensors[1], + # block_ids, token_length, is_prefill) + # 5. 异步执行不等待——减少对推理延迟的影响 pass diff --git a/nanovllm/layers/mla_attention.py b/nanovllm/layers/mla_attention.py new file mode 100644 index 000000000..199828bda --- /dev/null +++ b/nanovllm/layers/mla_attention.py @@ -0,0 +1,190 @@ +"""Multi-head Latent Attention (MLA) 实现。 + +MLA 是 DeepSeek-V2 提出的注意力机制,核心思想: + - 不存储完整的 K、V(每头 head_dim 维),而是压缩为低秩隐向量 kv_c(kv_lora_rank 维) + - KV Cache 只存 (kv_c, k_pe),相比标准 MHA 大幅节省显存 + - 计算 Attention 时再通过 kv_b_proj 将 kv_c 展开为完整的 K_nope, V + +在昇腾 910C 上进一步优化: + - 使用融合算子 npu_kv_rmsnorm_rope_cache 将 RMSNorm + RoPE + Cache 写入一步完成 + - CPU Offload 时对压缩后的 kv_c / k_pe 做 scatter+burst,数据量远小于完整 KV + +对应 vllm-ascend PR #1659 中的 mla_v1.py。 +""" + +import torch +from torch import nn + +from nanovllm.utils.context import get_context +from nanovllm.layers.attention_utils import ( + wait_for_kv_layer_from_connector, + maybe_save_kv_layer_to_connector, +) + + +class MLAAttention(nn.Module): + """Multi-head Latent Attention 模块。 + + KV 压缩流程: + hidden → kv_a_proj_with_mqa → [kv_c (kv_lora_rank), k_pe (qk_rope_head_dim)] + kv_c → kv_a_layernorm → kv_b_proj → [k_nope (num_heads * qk_nope_head_dim), v (num_heads * v_head_dim)] + + Query 压缩流程: + hidden → q_a_proj → q_c (q_lora_rank) + q_c → q_a_layernorm → q_b_proj → [q_nope (num_heads * qk_nope_head_dim), q_pe (num_heads * qk_rope_head_dim)] + + KV Cache 只存 [kv_c_cache, k_pe_cache],不存完整 K/V。 + + Args: + hidden_size: 模型隐藏层维度 + num_heads: 注意力头数 + qk_nope_head_dim: 每头 query/key 的非 RoPE 部分维度 + qk_rope_head_dim: 每头 query/key 的 RoPE 部分维度 + v_head_dim: 每头 value 的维度 + q_lora_rank: query 低秩压缩的秩 + kv_lora_rank: KV 低秩压缩的秩 + rope_theta: RoPE 的 theta 基数 + layer_name: 层名称标识,用于 CPU Offload 按层管理 + """ + + def __init__( + self, + hidden_size: int, + num_heads: int, + qk_nope_head_dim: int, + qk_rope_head_dim: int, + v_head_dim: int, + q_lora_rank: int, + kv_lora_rank: int, + rope_theta: float = 10000.0, + layer_name: str = "", + ): + super().__init__() + self.hidden_size = hidden_size + self.num_heads = num_heads + self.qk_nope_head_dim = qk_nope_head_dim + self.qk_rope_head_dim = qk_rope_head_dim + self.qk_head_dim = qk_nope_head_dim + qk_rope_head_dim + self.v_head_dim = v_head_dim + self.q_lora_rank = q_lora_rank + self.kv_lora_rank = kv_lora_rank + self.layer_name = layer_name + + self.scaling = self.qk_head_dim ** -0.5 + + # --- 以下权重矩阵在实际实现中为 nn.Linear / 并行 Linear --- + # q_a_proj: hidden_size → q_lora_rank (查询压缩) + # q_a_layernorm: RMSNorm(q_lora_rank) + # q_b_proj: q_lora_rank → num_heads * qk_head_dim (查询展开) + # kv_a_proj_with_mqa: hidden_size → kv_lora_rank + qk_rope_head_dim (KV压缩+位置) + # kv_a_layernorm: RMSNorm(kv_lora_rank) + # kv_b_proj: kv_lora_rank → num_heads * (qk_nope_head_dim + v_head_dim) (KV展开) + # o_proj: num_heads * v_head_dim → hidden_size (输出投影) + + # KV Cache:压缩表示,比标准 MHA 小得多 + # kv_c_cache: [num_blocks, block_size, kv_lora_rank] + # k_pe_cache: [num_blocks, block_size, qk_rope_head_dim] + self.kv_cache: list[torch.Tensor] = [] # [kv_c_cache, k_pe_cache] + + def _mla_preprocess( + self, + layer_name: str, + hidden_states: torch.Tensor, + slot_mapping: torch.Tensor, + is_prefill: bool, + ) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor | None]: + """MLA 预处理:压缩投影 + RMSNorm + RoPE + KV Cache 写入。 + + 对应 vllm-ascend 中 AscendMLAImpl._mla_preprocess(), + PR #1659 将 layer_name 加入签名以支持按层 CPU Offload。 + + Args: + layer_name: 当前层名称,用于 CPU Offload 按层定位 + hidden_states: 输入隐藏状态,shape = [num_tokens, hidden_size] + slot_mapping: 每个 token 在 KV Cache 中的 slot 位置 + is_prefill: 是否为 prefill 阶段 + + Returns: + q_nope: 查询非 RoPE 部分,shape = [num_tokens, num_heads, qk_nope_head_dim] + q_pe: 查询 RoPE 部分,shape = [num_tokens, num_heads, qk_rope_head_dim] + k_nope: 键非 RoPE 部分,shape = [num_tokens, num_heads, qk_nope_head_dim](prefill) + 或从 kv_c_cache 中重建(decode) + k_pe: 键 RoPE 部分,shape = [num_tokens, 1, qk_rope_head_dim] + v: 值,shape = [num_tokens, num_heads, v_head_dim](仅 prefill 返回) + decode 时为 None,由 attention kernel 内部从 cache 重建 + """ + # --- CPU Offload: 计算前先尝试从 CPU 换入该层的压缩 KV Cache --- + wait_for_kv_layer_from_connector(layer_name, *self.kv_cache) + + # --- 执行逻辑 --- + # 1. Query 压缩路径: + # q_c = q_a_proj(hidden_states) # [num_tokens, q_lora_rank] + # q_c = q_a_layernorm(q_c) # RMSNorm 归一化 + # q = q_b_proj(q_c) # [num_tokens, num_heads * qk_head_dim] + # q = q.view(num_tokens, num_heads, qk_head_dim) + # q_nope, q_pe = q.split([qk_nope_head_dim, qk_rope_head_dim], dim=-1) + # 对 q_pe 施加 RoPE 旋转 + # + # 2. KV 压缩路径: + # kv_no_split = kv_a_proj_with_mqa(hidden_states) # [num_tokens, kv_lora_rank + qk_rope_head_dim] + # + # 3. 在昇腾 910C 上,使用融合算子一步完成 RMSNorm + RoPE + Cache 写入: + # kv_c_normed, k_pe_roped = npu_kv_rmsnorm_rope_cache( + # kv_no_split, cos, sin, self.kv_cache, slot_mapping, + # kv_lora_rank, qk_rope_head_dim) + # (该融合算子在 AI Core 上执行,避免中间张量的 HBM 读写) + # + # 4. Prefill 分支: + # k_nope, v = kv_b_proj(kv_c_normed) 展开为完整 K_nope 和 V + # 返回 (q_nope, q_pe, k_nope, k_pe_roped, v) + # + # 5. Decode 分支: + # k_nope 和 v 不在此处展开——由 decode attention kernel + # 在读取 kv_c_cache 时通过 absorbed kv_b_proj 权重直接计算 + # 返回 (q_nope, q_pe, None, k_pe_roped, None) + pass + + def forward( + self, + layer_name: str, + hidden_states: torch.Tensor, + ) -> torch.Tensor: + """MLA forward:预处理 → Attention 计算 → 输出投影。 + + 对应 vllm-ascend 中 AscendMLAImpl.forward(), + PR #1659 增加 layer_name 参数以支持 CPU Offload。 + + Args: + layer_name: 当前层名称 + hidden_states: 输入隐藏状态,shape = [num_tokens, hidden_size] + + Returns: + output: 注意力输出,shape = [num_tokens, hidden_size] + """ + context = get_context() + + # --- 执行逻辑 --- + # 1. 调用 _mla_preprocess 获取 q_nope, q_pe, k_nope, k_pe, v + # q_nope, q_pe, k_nope, k_pe, v = self._mla_preprocess( + # layer_name, hidden_states, context.slot_mapping, context.is_prefill) + # + # 2. Prefill Attention(使用 flash_attn_varlen_func 或等效昇腾算子): + # a. 拼接 q = concat(q_nope, q_pe) → [num_tokens, num_heads, qk_head_dim] + # b. 拼接 k = concat(k_nope, k_pe.expand(num_heads)) → [num_tokens, num_heads, qk_head_dim] + # c. o = flash_attn_varlen(q, k, v, ..., causal=True) + # + # 3. Decode Attention(使用 flash_attn_with_kvcache 的 MLA 变体): + # a. 拼接 q = concat(q_nope, q_pe) + # b. attention kernel 内部直接从 kv_c_cache 和 k_pe_cache 读取, + # 通过 absorbed kv_b_proj 权重在 on-the-fly 展开 k_nope 和 v + # (避免在 HBM 中存储展开后的完整 K/V,节省带宽) + # c. o = mla_decode_attention(q, kv_c_cache, k_pe_cache, kv_b_proj_weight, ...) + # + # 4. CPU Offload: prefill 后将该层压缩 KV Cache 卸载到 CPU + # maybe_save_kv_layer_to_connector(layer_name, *self.kv_cache) + # + # 5. 输出投影: + # output = o_proj(o.flatten(-2, -1)) # [num_tokens, hidden_size] + # + # 6. 返回 output + pass diff --git a/nanovllm/models/deepseek_v2.py b/nanovllm/models/deepseek_v2.py new file mode 100644 index 000000000..2a511e5a4 --- /dev/null +++ b/nanovllm/models/deepseek_v2.py @@ -0,0 +1,249 @@ +"""DeepSeek-V2 模型定义(MLA + MoE 架构)。 + +还原 vllm-ascend PR #1659 中对 DeepSeek V2 模型的 CPU Offload 适配: +- DeepSeekV2Attention 使用 MLA(Multi-head Latent Attention),KV Cache 为压缩表示 +- layer_name 从 DecoderLayer 逐层传递到 MLAAttention,供 CPU Offload 按层管理 +- MoE(Mixture of Experts)层使用标准前馈网络桩 + +在昇腾 910C 上的特殊优化: +- MLA 的 KV Cache 是压缩的 (kv_c, k_pe),数据量远小于标准 MHA +- CPU Offload 时 scatter+burst 传输的数据量更小,进一步降低传输开销 +""" + +import torch +from torch import nn + +from nanovllm.layers.mla_attention import MLAAttention +from nanovllm.layers.layernorm import RMSNorm + + +class DeepSeekV2Attention(nn.Module): + """DeepSeek-V2 注意力层包装器。 + + 将 MLA 注意力封装为标准的 Attention 接口, + 负责 layer_name 的传递和 CPU Offload 的集成。 + + Args: + config: DeepSeek-V2 模型配置 + layer_idx: 当前层索引,用于生成 layer_name + """ + + def __init__(self, config, layer_idx: int = 0): + super().__init__() + self.layer_name = f"layers.{layer_idx}.self_attn" + + # --- 实际实现中从 config 读取以下参数 --- + # config.hidden_size: 隐藏层维度(如 5120) + # config.num_attention_heads: 注意力头数(如 128) + # config.qk_nope_head_dim: 非 RoPE 维度(如 128) + # config.qk_rope_head_dim: RoPE 维度(如 64) + # config.v_head_dim: V 维度(如 128) + # config.q_lora_rank: Q 压缩秩(如 1536) + # config.kv_lora_rank: KV 压缩秩(如 512) + + self.attn = None # MLAAttention 实例,实际实现中在此创建 + + def forward( + self, + positions: torch.Tensor, + hidden_states: torch.Tensor, + ) -> torch.Tensor: + """DeepSeek-V2 注意力前向。 + + Args: + positions: 位置编码,shape = [num_tokens] + hidden_states: 输入隐藏状态,shape = [num_tokens, hidden_size] + + Returns: + output: 注意力输出,shape = [num_tokens, hidden_size] + """ + # --- 执行逻辑 --- + # 1. 调用 MLA 注意力,传入 layer_name 以支持按层 CPU Offload: + # output = self.attn(self.layer_name, hidden_states) + # + # 2. MLA 内部会: + # a. 调用 wait_for_kv_layer_from_connector 尝试 swap in + # b. 执行压缩投影 + RMSNorm + RoPE(910C 上为融合算子) + # c. 执行 Attention 计算 + # d. 调用 maybe_save_kv_layer_to_connector 尝试 swap out + # e. 输出投影 + # + # 3. 返回 output + pass + + +class DeepSeekV2MoEMLP(nn.Module): + """DeepSeek-V2 MoE(Mixture of Experts)前馈网络。 + + Args: + config: 模型配置 + layer_idx: 层索引 + """ + + def __init__(self, config, layer_idx: int = 0): + super().__init__() + # --- 实际实现中包含以下组件 --- + # config.num_experts: 总专家数(如 160) + # config.num_experts_per_tok: 每 token 激活的专家数(如 6) + # config.moe_intermediate_size: 专家 FFN 中间维度 + # self.gate: 门控网络,hidden_size → num_experts + # self.experts: nn.ModuleList of FFN experts + # self.shared_expert: 共享专家 FFN(所有 token 都经过) + pass + + def forward(self, hidden_states: torch.Tensor) -> torch.Tensor: + """MoE 前向:门控路由 + 专家并行计算。 + + Args: + hidden_states: 输入,shape = [num_tokens, hidden_size] + + Returns: + output: MoE 输出,shape = [num_tokens, hidden_size] + """ + # --- 执行逻辑 --- + # 1. 门控路由: + # router_logits = self.gate(hidden_states) # [num_tokens, num_experts] + # topk_weights, topk_ids = topk(router_logits, k=num_experts_per_tok) + # + # 2. 专家并行计算(在昇腾 910C 上可利用多 AI Core 并行): + # expert_output = sum( + # weight_i * expert_i(hidden_states) + # for weight_i, expert_i in selected_experts + # ) + # + # 3. 共享专家: + # shared_output = self.shared_expert(hidden_states) + # + # 4. 合并:output = expert_output + shared_output + # 5. 返回 output + pass + + +class DeepSeekV2DecoderLayer(nn.Module): + """DeepSeek-V2 解码器层:MLA Attention + MoE/Dense MLP。 + + Args: + config: 模型配置 + layer_idx: 层索引,传递给 Attention 以生成 layer_name + """ + + def __init__(self, config, layer_idx: int = 0): + super().__init__() + self.self_attn = DeepSeekV2Attention(config, layer_idx=layer_idx) + + # DeepSeek-V2 中部分层使用 MoE,部分使用 Dense MLP + # 由 config.moe_layer_freq 和 config.first_k_dense_replace 决定 + self.mlp = DeepSeekV2MoEMLP(config, layer_idx=layer_idx) + + self.input_layernorm = None # RMSNorm(hidden_size) + self.post_attention_layernorm = None # RMSNorm(hidden_size) + + def forward( + self, + positions: torch.Tensor, + hidden_states: torch.Tensor, + residual: torch.Tensor | None, + ) -> tuple[torch.Tensor, torch.Tensor]: + """解码器层前向。 + + Args: + positions: 位置编码 + hidden_states: 输入隐藏状态 + residual: 残差连接张量 + + Returns: + hidden_states: 输出隐藏状态 + residual: 更新后的残差张量 + """ + # --- 执行逻辑 --- + # 1. 第一个残差连接 + LayerNorm: + # if residual is None: + # hidden_states, residual = input_layernorm(hidden_states), hidden_states + # else: + # hidden_states, residual = input_layernorm(hidden_states, residual) + # + # 2. MLA 注意力(内含 CPU Offload 钩子): + # hidden_states = self.self_attn(positions, hidden_states) + # + # 3. 第二个残差连接 + LayerNorm: + # hidden_states, residual = post_attention_layernorm(hidden_states, residual) + # + # 4. MoE / Dense MLP: + # hidden_states = self.mlp(hidden_states) + # + # 5. 返回 (hidden_states, residual) + pass + + +class DeepSeekV2ForCausalLM(nn.Module): + """DeepSeek-V2 因果语言模型。 + + 完整模型结构: + Embedding → [DecoderLayer × N] → RMSNorm → LMHead + + MLA + CPU Offload 集成: + 每个 DecoderLayer 通过 layer_idx 生成唯一 layer_name, + MLA 的压缩 KV Cache (kv_c, k_pe) 在 prefill 后可按层卸载到 CPU, + decode 时按需 swap in。 + + Args: + config: DeepSeek-V2 模型配置 + """ + + packed_modules_mapping = { + "q_a_proj": ("fused_qkv_a_proj", "q"), + "kv_a_proj_with_mqa": ("fused_qkv_a_proj", "kv"), + "gate_proj": ("gate_up_proj", 0), + "up_proj": ("gate_up_proj", 1), + } + + def __init__(self, config): + super().__init__() + # self.embed_tokens = VocabParallelEmbedding(config.vocab_size, config.hidden_size) + # self.layers = nn.ModuleList([ + # DeepSeekV2DecoderLayer(config, layer_idx=i) + # for i in range(config.num_hidden_layers) + # ]) + # self.norm = RMSNorm(config.hidden_size) + # self.lm_head = ParallelLMHead(config.vocab_size, config.hidden_size) + pass + + def forward( + self, + input_ids: torch.Tensor, + positions: torch.Tensor, + ) -> torch.Tensor: + """模型前向传播。 + + Args: + input_ids: 输入 token ID,shape = [num_tokens] + positions: 位置信息,shape = [num_tokens] + + Returns: + hidden_states: 最后一层的隐藏状态,shape = [num_tokens, hidden_size] + """ + # --- 执行逻辑 --- + # 1. hidden_states = embed_tokens(input_ids) + # 2. residual = None + # 3. for layer in self.layers: + # hidden_states, residual = layer(positions, hidden_states, residual) + # 每一层的 MLA Attention 内部会: + # - wait_for_kv_layer_from_connector(layer_name, kv_c_cache, k_pe_cache) + # - 执行压缩投影 + 融合 RMSNorm+RoPE+Cache 写入 + # - Flash Attention 计算 + # - maybe_save_kv_layer_to_connector(layer_name, kv_c_cache, k_pe_cache) + # 4. hidden_states, _ = norm(hidden_states, residual) + # 5. return hidden_states + pass + + def compute_logits(self, hidden_states: torch.Tensor) -> torch.Tensor: + """计算词表 logits。 + + Args: + hidden_states: shape = [num_tokens, hidden_size] + + Returns: + logits: shape = [num_tokens, vocab_size] + """ + # return self.lm_head(hidden_states) + pass From 95652253b00c4e5dc4d897a5af012a4d71519e30 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 12 Apr 2026 17:06:37 +0000 Subject: [PATCH 3/3] revert: remove scatter+burst optimization stubs The scatter+burst optimization was based on an incorrect model of the 910C DMA pipeline. After accounting for: - DMA Engine descriptor queue depth (16-32, naturally pipelined) - MLA block size (288 KB) being already transfer-bound - The extra HBM I/O cost of the scatter step (~44 us for 35 MB) - HBM bandwidth contention with MoE weight loading scatter+burst is actually 5-7% slower than the per-block pipelined path on MLA-sized blocks. This explains why the original vllm-ascend PR did not include this optimization. Remove: - nanovllm/layers/ascend/ (npu_ops, hccl_utils stubs) - CPUOffloadConnector.ascend_swap_*_with_*_burst methods - CPUKVCacheManager.{store,load}_packed_blocks - attention_utils _USE_ASCEND_BURST flag - MLA attention's reference to npu_kv_rmsnorm_rope_cache Keep MLA attention and DeepSeek V2 model stubs (those are valid restorations of the original PR's MLA support). https://claude.ai/code/session_01WTTVHqXWMtHAQt5picfcig --- nanovllm/distributed/cpu_offload_connector.py | 114 ----------- .../cpu_kv_cache_manager.py | 74 ------- nanovllm/layers/ascend/__init__.py | 12 -- nanovllm/layers/ascend/hccl_utils.py | 88 --------- nanovllm/layers/ascend/npu_ops.py | 184 ------------------ nanovllm/layers/attention_utils.py | 39 ++-- nanovllm/layers/mla_attention.py | 16 +- 7 files changed, 19 insertions(+), 508 deletions(-) delete mode 100644 nanovllm/layers/ascend/__init__.py delete mode 100644 nanovllm/layers/ascend/hccl_utils.py delete mode 100644 nanovllm/layers/ascend/npu_ops.py diff --git a/nanovllm/distributed/cpu_offload_connector.py b/nanovllm/distributed/cpu_offload_connector.py index a49af0f47..ca16e8129 100644 --- a/nanovllm/distributed/cpu_offload_connector.py +++ b/nanovllm/distributed/cpu_offload_connector.py @@ -4,11 +4,6 @@ 在 Connector 内部启动 metadata server,管理 CPU KV Cache 并提供 RPC 调用接口,支持多个 data-parallel EngineCore 实例之间共享 KV Cache。 对应 vllm-ascend PR #1659 中的 cpu_offload_connector.py。 - -昇腾 910C 适配: - 针对达芬奇架构分离式芯片特点,提供 scatter+burst 传输路径—— - 先将分散的小 block 打包为连续 buffer(scatter), - 再以 burst 模式做 DMA 传输,降低调度和编译开销。 """ import torch @@ -154,115 +149,6 @@ def should_swap_in(self, num_free_gpu_blocks: int) -> bool: # 否则返回 False return False - # ================================================================ - # 昇腾 910C 专用:scatter + burst 传输路径 - # ================================================================ - - def ascend_swap_out_with_scatter_burst( - self, - layer_name: str, - seq_id: int, - npu_k_cache: torch.Tensor, - npu_v_cache: torch.Tensor, - block_ids: list[int], - token_length: int, - is_prefill: bool, - ) -> bool: - """昇腾 910C 优化的 swap out:scatter 打包 + burst DMA 传输。 - - 标准路径(逐 block 拷贝)在达芬奇架构上的问题: - - 每个小 block 需要一次独立的 DMA 事务 - - 每次 DMA 事务有 ~5μs 的 AI CPU 调度开销 - - N 个 block → N 次调度 → 调度总开销 = N × 5μs - - 编译器需为每个 block 生成独立的 DMA 指令 - - 本方法的优化策略: - 1. scatter: 在 NPU 的 AI Core 上将 N 个分散 block 打包到连续 buffer - → 仅 1 次算子下发(AI Core 内部并行执行,无 AI CPU 调度) - 2. burst: 对连续 buffer 做 1 次 DMA burst 传输 - → 仅 1 次 DMA 调度,充分利用 HBM→DDR 带宽 - 总调度开销:~5μs(1 次 scatter 算子 + 1 次 DMA),而非 N × 5μs - - Args: - layer_name: 模型层名称 - seq_id: 序列 ID - npu_k_cache: NPU 端该层 K cache - npu_v_cache: NPU 端该层 V cache - block_ids: 需要卸载的 block ID 列表(在 NPU 显存中可能不连续) - token_length: 序列当前 token 长度 - is_prefill: 是否 prefill 阶段 - - Returns: - bool: 卸载是否成功 - """ - # --- 执行逻辑 --- - # 1. 调用 npu_scatter_kv_blocks(npu_k_cache, npu_v_cache, block_ids, block_size) - # 在 AI Core 上将分散的 block 打包到连续的 k_packed, v_packed - # - # 2. 在 CPU 端分配对应大小的 pin_memory buffer: - # cpu_k_buf = torch.empty_like(k_packed, device="cpu", pin_memory=True) - # cpu_v_buf = torch.empty_like(v_packed, device="cpu", pin_memory=True) - # - # 3. 调用 npu_burst_copy_to_cpu(k_packed, cpu_k_buf) 做 burst DMA - # 调用 npu_burst_copy_to_cpu(v_packed, cpu_v_buf) - # - # 4. 将 cpu_k_buf / cpu_v_buf 写入 manager 的 CPU 存储池 - # (按 block 粒度切分后存入 manager.cpu_k_cache / cpu_v_cache) - # - # 5. 记录 metadata 映射关系 - # 6. 返回 True - return False - - def ascend_swap_in_with_burst_gather( - self, - layer_name: str, - seq_id: int, - npu_k_cache: torch.Tensor, - npu_v_cache: torch.Tensor, - block_ids: list[int], - token_length: int, - is_prefill: bool, - ) -> bool: - """昇腾 910C 优化的 swap in:burst DMA 传输 + gather 分发。 - - scatter+burst 的逆操作: - 1. 从 CPU 端拼装连续 buffer - 2. burst DMA 一次性传输到 NPU - 3. gather 分发到各 block 位置 - - Args: - layer_name: 模型层名称 - seq_id: 序列 ID - npu_k_cache: NPU 端该层 K cache(写入目标) - npu_v_cache: NPU 端该层 V cache(写入目标) - block_ids: 需要加载的 block ID 列表 - token_length: 序列当前 token 长度 - is_prefill: 是否 prefill 阶段 - - Returns: - bool: 加载是否成功 - """ - # --- 执行逻辑 --- - # 1. 从 manager 的 CPU 存储池中读取对应 block 数据, - # 拼装为连续的 cpu_k_buf / cpu_v_buf - # - # 2. 在 NPU 端分配对应大小的连续接收 buffer: - # npu_k_buf = torch.empty_like(cpu_k_buf, device="npu") - # npu_v_buf = torch.empty_like(cpu_v_buf, device="npu") - # - # 3. 调用 npu_burst_copy_to_npu(cpu_k_buf, npu_k_buf) 做 burst DMA - # 调用 npu_burst_copy_to_npu(cpu_v_buf, npu_v_buf) - # - # 4. 等待 DMA 传输完成(stream 同步,确保数据到达 NPU) - # - # 5. 调用 npu_gather_kv_blocks(npu_k_buf, npu_v_buf, - # npu_k_cache, npu_v_cache, block_ids, block_size) - # 在 AI Core 上将连续 buffer 分发到各 block 位置 - # - # 6. 释放 CPU 端对应 block,更新 metadata - # 7. 返回 True - return False - def close(self) -> None: """关闭连接器,释放 CPU 资源并停止 metadata server。 diff --git a/nanovllm/distributed/cpu_offload_manager/cpu_kv_cache_manager.py b/nanovllm/distributed/cpu_offload_manager/cpu_kv_cache_manager.py index 742811ecf..fbc3be205 100644 --- a/nanovllm/distributed/cpu_offload_manager/cpu_kv_cache_manager.py +++ b/nanovllm/distributed/cpu_offload_manager/cpu_kv_cache_manager.py @@ -174,77 +174,3 @@ def clear_seq(self, seq_id: int) -> None: # 2. 对每个匹配条目调用 free_cpu_block 释放 # 3. 从 block_mapping 中删除这些条目 pass - - # ================================================================ - # 昇腾 910C 专用:适配 scatter+burst 传输模式 - # ================================================================ - - def store_packed_blocks( - self, - layer_name: str, - seq_id: int, - k_packed: torch.Tensor, - v_packed: torch.Tensor, - block_ids: list[int], - ) -> bool: - """将 scatter 打包后的连续 KV 数据按 block 粒度存入 CPU 存储池。 - - 由 CPUOffloadConnector.ascend_swap_out_with_scatter_burst 在 - burst DMA 传输完成后调用。此时 k_packed/v_packed 已在 CPU pin_memory 上。 - - Args: - layer_name: 层名称 - seq_id: 序列 ID - k_packed: 打包后的连续 K 数据(CPU 端), - shape = [len(block_ids) * block_size, num_kv_heads, head_dim] - v_packed: 打包后的连续 V 数据(CPU 端),shape 同上 - block_ids: 对应的 NPU 端 block ID 列表 - - Returns: - bool: 存储是否成功 - """ - # --- 执行逻辑 --- - # 1. 将 k_packed / v_packed 按 block_size 切分为 len(block_ids) 段 - # 2. 对每段: - # a. 调用 allocate_cpu_block() 获取 cpu_block_id - # b. 若分配失败,回滚并返回 False - # c. 将该段直接拷贝(CPU→CPU memcpy,零拷贝或 pin_memory 内拷贝) - # 到 cpu_k_cache[layer_idx][cpu_block_id] 和 cpu_v_cache 对应位置 - # d. 记录 block_mapping[(seq_id, layer_name, block_ids[i])] = cpu_block_id - # 3. 返回 True - return False - - def load_packed_blocks( - self, - layer_name: str, - seq_id: int, - block_ids: list[int], - ) -> tuple[torch.Tensor, torch.Tensor] | None: - """将 CPU 存储池中的多个 block 拼装为连续 buffer,供 burst DMA 传输回 NPU。 - - 由 CPUOffloadConnector.ascend_swap_in_with_burst_gather 在 - burst DMA 传输前调用。返回的连续 buffer 可直接用于 npu_burst_copy_to_npu。 - - Args: - layer_name: 层名称 - seq_id: 序列 ID - block_ids: 需要加载的 NPU 端 block ID 列表 - - Returns: - (k_packed, v_packed): 拼装好的连续 CPU 张量 - 若任意 block 不在 CPU 缓存中,返回 None - """ - # --- 执行逻辑 --- - # 1. 对每个 block_id in block_ids: - # a. 通过 block_mapping 查找对应的 cpu_block_id - # b. 若找不到,返回 None - # 2. 分配连续 CPU buffer: - # k_packed = torch.empty([len(block_ids) * block_size, num_kv_heads, head_dim], - # pin_memory=True) - # v_packed 同上 - # 3. 对每个 block: - # 将 cpu_k_cache[layer_idx][cpu_block_id] 拷贝到 k_packed 对应段 - # 将 cpu_v_cache[layer_idx][cpu_block_id] 拷贝到 v_packed 对应段 - # 4. 释放已加载的 cpu_block(调用 free_cpu_block) - # 5. 返回 (k_packed, v_packed) - return None diff --git a/nanovllm/layers/ascend/__init__.py b/nanovllm/layers/ascend/__init__.py deleted file mode 100644 index e97d46d28..000000000 --- a/nanovllm/layers/ascend/__init__.py +++ /dev/null @@ -1,12 +0,0 @@ -from nanovllm.layers.ascend.npu_ops import ( - npu_scatter_kv_blocks, - npu_gather_kv_blocks, - npu_burst_copy_to_cpu, - npu_burst_copy_to_npu, - npu_kv_rmsnorm_rope_cache, -) -from nanovllm.layers.ascend.hccl_utils import ( - hccl_init_comm_group, - hccl_all_gather, - hccl_reduce_scatter, -) diff --git a/nanovllm/layers/ascend/hccl_utils.py b/nanovllm/layers/ascend/hccl_utils.py deleted file mode 100644 index cfa908ee7..000000000 --- a/nanovllm/layers/ascend/hccl_utils.py +++ /dev/null @@ -1,88 +0,0 @@ -"""昇腾 HCCL (Huawei Collective Communication Library) 工具函数。 - -HCCL 是昇腾 NPU 上替代 NCCL 的集合通信库, -用于多卡/多机之间的张量通信(AllGather, ReduceScatter 等)。 -在 MLA + DP 并行场景下,用于跨 EngineCore 实例同步 KV Cache 元数据。 -""" - -import torch - - -def hccl_init_comm_group( - world_size: int, - rank: int, - group_name: str = "default", -) -> object: - """初始化 HCCL 通信组。 - - Args: - world_size: 通信组中的总进程数 - rank: 当前进程在通信组中的序号 - group_name: 通信组名称,用于区分不同用途的通信组 - 如 "dp_group"(数据并行)、"kv_transfer_group"(KV 传输) - - Returns: - comm_group: HCCL 通信组句柄,供后续通信操作使用 - """ - # --- 执行逻辑 --- - # 1. 调用 torch_npu.distributed 接口初始化进程组: - # torch.distributed.init_process_group( - # backend="hccl", world_size=world_size, rank=rank) - # 2. 创建命名子组: - # group = torch.distributed.new_group(ranks=list(range(world_size)), backend="hccl") - # 3. 验证 HCCL 链路连通性(ping-pong 测试) - # 4. 返回通信组句柄 - return None - - -def hccl_all_gather( - input_tensor: torch.Tensor, - comm_group: object, - world_size: int, -) -> torch.Tensor: - """通过 HCCL 执行 AllGather 操作。 - - 在 MLA 中,fused_qkv_a_proj 的输出 (q_c, kv_no_split) - 可能需要跨 TP rank 做 AllGather 以获取完整的隐向量。 - - Args: - input_tensor: 当前 rank 的局部张量,shape = [num_tokens, local_dim] - comm_group: HCCL 通信组句柄 - world_size: 通信组大小 - - Returns: - gathered: AllGather 后的完整张量,shape = [num_tokens, local_dim * world_size] - """ - # --- 执行逻辑 --- - # 1. 分配输出 buffer:shape = [num_tokens, local_dim * world_size] - # 2. 调用 HCCL AllGather: - # torch.distributed.all_gather_into_tensor( - # output, input_tensor, group=comm_group) - # 3. 返回 gathered 张量 - return input_tensor - - -def hccl_reduce_scatter( - input_tensor: torch.Tensor, - comm_group: object, - world_size: int, -) -> torch.Tensor: - """通过 HCCL 执行 ReduceScatter 操作。 - - MLA 的 output projection 后需要跨 TP rank 做 ReduceScatter。 - - Args: - input_tensor: 当前 rank 的局部结果,shape = [num_tokens, total_dim] - comm_group: HCCL 通信组句柄 - world_size: 通信组大小 - - Returns: - reduced: ReduceScatter 后的张量,shape = [num_tokens, total_dim // world_size] - """ - # --- 执行逻辑 --- - # 1. 分配输出 buffer:shape = [num_tokens, total_dim // world_size] - # 2. 调用 HCCL ReduceScatter(求和归约): - # torch.distributed.reduce_scatter_tensor( - # output, input_tensor, op=ReduceOp.SUM, group=comm_group) - # 3. 返回 reduced 张量 - return input_tensor diff --git a/nanovllm/layers/ascend/npu_ops.py b/nanovllm/layers/ascend/npu_ops.py deleted file mode 100644 index 7819578d4..000000000 --- a/nanovllm/layers/ascend/npu_ops.py +++ /dev/null @@ -1,184 +0,0 @@ -"""昇腾 910C NPU 算子桩函数。 - -针对达芬奇(DaVinci)架构的分离式芯片特点: -- AI Core(向量/矩阵计算单元)与 AI CPU(调度单元)分离 -- 内存搬运通过 DMA 引擎完成,每次 DMA 事务有固定调度开销 -- 小 block 分散传输会产生大量 DMA 事务,调度和编译开销高 - -核心优化策略: - scatter → 将分散的小 KV block 打包到连续 buffer - burst → 以连续大块进行 DMA 传输,降低事务数和调度开销 -""" - -import torch - - -def npu_scatter_kv_blocks( - k_cache: torch.Tensor, - v_cache: torch.Tensor, - block_ids: list[int], - block_size: int, -) -> tuple[torch.Tensor, torch.Tensor]: - """将分散在 NPU 显存中的多个小 KV block 打包(scatter)到连续 buffer。 - - 达芬奇架构下,每次 DMA 事务有固定的调度开销(约数微秒)。 - 若逐 block 传输(如 16 个 256-token block),会产生 16 次 DMA 事务。 - 先 scatter 到连续 buffer 再做一次 burst 传输,事务数降为 1。 - - Args: - k_cache: NPU 端整层 K cache,shape = [num_blocks, block_size, num_kv_heads, head_dim] - v_cache: NPU 端整层 V cache,shape 同上 - block_ids: 需要打包的物理 block ID 列表(可能不连续) - block_size: 每个 block 的 token 数 - - Returns: - k_packed: 连续打包后的 K 数据,shape = [len(block_ids) * block_size, num_kv_heads, head_dim] - v_packed: 连续打包后的 V 数据,shape 同上 - """ - # --- 执行逻辑 --- - # 1. 计算 packed buffer 总大小 = len(block_ids) * block_size * num_kv_heads * head_dim - # 2. 在 NPU 上分配连续的 k_packed, v_packed 缓冲区 - # 3. 利用达芬奇 Vector Core 的 scatter 指令(或 torch_npu.npu_scatter): - # 对每个 block_id,将 k_cache[block_id] 和 v_cache[block_id] - # 拷贝到 packed buffer 的对应偏移位置 - # 4. 关键:scatter 操作在 AI Core 上完成,无需 AI CPU 调度介入, - # 因此多 block 打包只有一次算子下发开销 - # 5. 返回 (k_packed, v_packed) - pass - - -def npu_gather_kv_blocks( - k_packed: torch.Tensor, - v_packed: torch.Tensor, - k_cache: torch.Tensor, - v_cache: torch.Tensor, - block_ids: list[int], - block_size: int, -) -> None: - """将连续 buffer 中的 KV 数据分发(gather/scatter-back)到 NPU cache 的各 block 位置。 - - npu_scatter_kv_blocks 的逆操作:CPU→NPU burst 传输完成后, - 将连续 buffer 拆分写回各 block slot。 - - Args: - k_packed: 从 CPU burst 传输回来的连续 K 数据 - v_packed: 从 CPU burst 传输回来的连续 V 数据 - k_cache: NPU 端整层 K cache(写入目标) - v_cache: NPU 端整层 V cache(写入目标) - block_ids: 目标物理 block ID 列表 - block_size: 每个 block 的 token 数 - - Returns: - None(原地写入 k_cache / v_cache 的指定 block 位置) - """ - # --- 执行逻辑 --- - # 1. 对 k_packed / v_packed 按 block_size 切分为 len(block_ids) 段 - # 2. 利用达芬奇 Vector Core 的 scatter 写指令: - # 将每段写入 k_cache[block_ids[i]] 和 v_cache[block_ids[i]] - # 3. 同 scatter 打包一样,仅一次算子下发,避免多次 AI CPU 调度 - pass - - -def npu_burst_copy_to_cpu( - npu_tensor: torch.Tensor, - cpu_tensor: torch.Tensor, -) -> None: - """通过 DMA burst 模式将 NPU 连续内存块拷贝到 CPU pin_memory。 - - burst 模式下 DMA 引擎一次性搬运整块连续数据,相比逐小块拷贝: - - 只需 1 次 DMA 事务调度(而非 N 次) - - 编译器只需生成 1 条 DMA 指令(减少编译开销) - - 充分利用 910C 的 HBM→DDR 带宽 - - Args: - npu_tensor: NPU 端连续张量(源),由 scatter 打包得到 - cpu_tensor: CPU 端 pin_memory 张量(目标),形状与 npu_tensor 一致 - - Returns: - None(异步传输,调用后数据可能尚未到达 CPU) - """ - # --- 执行逻辑 --- - # 1. 断言 npu_tensor 是连续的(is_contiguous) - # 2. 断言 cpu_tensor 使用 pin_memory 分配 - # 3. 调用 torch_npu 的异步 DMA 拷贝接口: - # torch.npu.current_stream().memcpy_async(cpu_tensor, npu_tensor) - # 底层映射到达芬奇 DMA Engine 的 burst 传输模式 - # 4. 不做 stream 同步——交由上层决定何时 sync - pass - - -def npu_burst_copy_to_npu( - cpu_tensor: torch.Tensor, - npu_tensor: torch.Tensor, -) -> None: - """通过 DMA burst 模式将 CPU pin_memory 拷贝到 NPU 连续内存。 - - 与 npu_burst_copy_to_cpu 对称,用于 swap in(CPU→NPU)场景。 - - Args: - cpu_tensor: CPU 端 pin_memory 张量(源) - npu_tensor: NPU 端连续张量(目标) - - Returns: - None(异步传输) - """ - # --- 执行逻辑 --- - # 1. 断言 cpu_tensor 是 pin_memory - # 2. 断言 npu_tensor 是连续的 - # 3. 调用 torch_npu 异步 DMA: - # torch.npu.current_stream().memcpy_async(npu_tensor, cpu_tensor) - # 4. 不做 stream 同步 - pass - - -def npu_kv_rmsnorm_rope_cache( - kv_no_split: torch.Tensor, - cos: torch.Tensor, - sin: torch.Tensor, - kv_cache: list[torch.Tensor], - slots: torch.Tensor, - kv_lora_rank: int, - qk_rope_head_dim: int, - eps: float = 1e-6, -) -> tuple[torch.Tensor, torch.Tensor]: - """达芬奇架构 MLA 专用融合算子:RMSNorm + RoPE + KV Cache 写入一步完成。 - - 在 MLA 中,每一步需要对 kv_c(压缩 KV 隐向量)做 RMSNorm, - 对 k_pe(位置编码部分)做 RoPE,然后写入 KV Cache。 - 在 910C 的 AI Core 上将这三步融合为一个算子,避免中间张量的 - HBM 读写和多次算子下发。 - - Args: - kv_no_split: 压缩后的 KV 拼接向量,shape = [num_tokens, kv_lora_rank + qk_rope_head_dim] - 前 kv_lora_rank 维是 kv_c,后 qk_rope_head_dim 维是 k_pe - cos: RoPE 余弦,shape = [num_tokens, qk_rope_head_dim] - sin: RoPE 正弦,shape = [num_tokens, qk_rope_head_dim] - kv_cache: 两个张量的列表 [kv_c_cache, k_pe_cache] - kv_c_cache: shape = [num_blocks, block_size, kv_lora_rank] - k_pe_cache: shape = [num_blocks, block_size, qk_rope_head_dim] - slots: 每个 token 的 cache slot 位置,shape = [num_tokens] - kv_lora_rank: KV 低秩压缩的秩 - qk_rope_head_dim: RoPE 位置编码的维度 - eps: RMSNorm 的 epsilon - - Returns: - kv_c_normed: RMSNorm 归一化后的 kv_c,shape = [num_tokens, kv_lora_rank] - k_pe_roped: RoPE 旋转后的 k_pe,shape = [num_tokens, qk_rope_head_dim] - """ - # --- 执行逻辑 --- - # 1. 将 kv_no_split 沿最后一维拆分为 kv_c 和 k_pe: - # kv_c = kv_no_split[:, :kv_lora_rank] - # k_pe = kv_no_split[:, kv_lora_rank:] - # 2. 对 kv_c 执行 RMSNorm: - # kv_c_normed = kv_c * rsqrt(mean(kv_c^2) + eps) - # 3. 对 k_pe 执行 RoPE: - # k_pe_roped = k_pe * cos + rotate_half(k_pe) * sin - # 4. 将 kv_c_normed 写入 kv_cache[0] 对应 slot 位置 - # 将 k_pe_roped 写入 kv_cache[1] 对应 slot 位置 - # 5. 以上四步在达芬奇 AI Core 上融合执行: - # - 使用 Cube Unit 做矩阵乘(RMSNorm 的 rsqrt 部分) - # - 使用 Vector Unit 做逐元素 RoPE 旋转 - # - 使用 MTE (Memory Transfer Engine) 做 cache 写入 - # 全程无中间 HBM 读写,只有一次算子下发 - # 6. 返回 (kv_c_normed, k_pe_roped) - pass diff --git a/nanovllm/layers/attention_utils.py b/nanovllm/layers/attention_utils.py index 2bbb0fb89..d6075226e 100644 --- a/nanovllm/layers/attention_utils.py +++ b/nanovllm/layers/attention_utils.py @@ -9,8 +9,6 @@ - MLA 压缩格式:传入 (kv_c_cache, k_pe_cache) 两个张量 两者接口统一,通过 *kv_cache_tensors 可变参数适配。 -昇腾 910C 上,检测到 NPU device 时自动选择 scatter+burst 传输路径。 - 对应 vllm-ascend PR #1659 中从 attention_v1.py 提取到 utils.py 的两个函数。 """ @@ -19,20 +17,15 @@ # 全局 connector 引用,由 ModelRunner 初始化时设置 _CONNECTOR = None -# 是否使用昇腾 scatter+burst 路径(由 _maybe_init 时根据 device 类型设置) -_USE_ASCEND_BURST = False - -def set_connector(connector, use_ascend_burst: bool = False) -> None: +def set_connector(connector) -> None: """设置全局 CPU Offload Connector 实例。 Args: - connector: CPUOffloadConnector 实例,或 None 表示不启用 offload - use_ascend_burst: 是否启用昇腾 scatter+burst 传输路径 + connector: CPUOffloadConnector 实例,或 None 表示不启用 offload """ - global _CONNECTOR, _USE_ASCEND_BURST + global _CONNECTOR _CONNECTOR = connector - _USE_ASCEND_BURST = use_ascend_burst def get_connector(): @@ -67,15 +60,10 @@ def wait_for_kv_layer_from_connector( # 1. 检查全局 _CONNECTOR 是否为 None,若是则直接 return # 2. 从当前推理上下文(get_context())中获取 seq_id、block_ids 等信息 # 3. 检查该层对应的 block 是否已被 swap out 到 CPU - # 4. 根据 _USE_ASCEND_BURST 选择传输路径: - # a. 昇腾路径:调用 connector.ascend_swap_in_with_burst_gather( - # layer_name, seq_id, kv_cache_tensors[0], kv_cache_tensors[1], - # block_ids, token_length, is_prefill) - # → burst DMA 传输 + AI Core gather 分发 - # b. 通用路径:调用 connector.recv_kv_caches_and_hidden_states( - # layer_name, seq_id, kv_cache_tensors[0], kv_cache_tensors[1], - # block_ids, token_length, is_prefill) - # → 逐 block CUDA memcpy + # 4. 调用 connector.recv_kv_caches_and_hidden_states( + # layer_name, seq_id, kv_cache_tensors[0], kv_cache_tensors[1], + # block_ids, token_length, is_prefill) + # 将 CPU 上缓存的 KV 数据拷贝回 GPU/NPU 端 # 5. 等待传输完成(stream 同步),确保后续算子读到有效数据 pass @@ -103,14 +91,9 @@ def maybe_save_kv_layer_to_connector( # 1. 检查全局 _CONNECTOR 是否为 None,若是则直接 return # 2. 从当前推理上下文中获取 seq_id、block_ids、token_length、is_prefill # 3. 仅在 is_prefill == True 时执行卸载 - # 4. 根据 _USE_ASCEND_BURST 选择传输路径: - # a. 昇腾路径:调用 connector.ascend_swap_out_with_scatter_burst( - # layer_name, seq_id, kv_cache_tensors[0], kv_cache_tensors[1], - # block_ids, token_length, is_prefill) - # → AI Core scatter 打包 + burst DMA 传输 - # → 对 MLA 格式尤其高效:kv_c + k_pe 数据量远小于完整 K+V - # b. 通用路径:调用 connector.send_kv_caches_and_hidden_states( - # layer_name, seq_id, kv_cache_tensors[0], kv_cache_tensors[1], - # block_ids, token_length, is_prefill) + # 4. 调用 connector.send_kv_caches_and_hidden_states( + # layer_name, seq_id, kv_cache_tensors[0], kv_cache_tensors[1], + # block_ids, token_length, is_prefill) + # 通过异步拷贝将 GPU/NPU 上的 KV Cache 数据写入 CPU pin_memory 区域 # 5. 异步执行不等待——减少对推理延迟的影响 pass diff --git a/nanovllm/layers/mla_attention.py b/nanovllm/layers/mla_attention.py index 199828bda..5b4d55450 100644 --- a/nanovllm/layers/mla_attention.py +++ b/nanovllm/layers/mla_attention.py @@ -5,9 +5,9 @@ - KV Cache 只存 (kv_c, k_pe),相比标准 MHA 大幅节省显存 - 计算 Attention 时再通过 kv_b_proj 将 kv_c 展开为完整的 K_nope, V -在昇腾 910C 上进一步优化: - - 使用融合算子 npu_kv_rmsnorm_rope_cache 将 RMSNorm + RoPE + Cache 写入一步完成 - - CPU Offload 时对压缩后的 kv_c / k_pe 做 scatter+burst,数据量远小于完整 KV +CPU Offload 适配: + MLA 的压缩 KV Cache 数据量远小于标准 MHA, + 使得 GPU↔CPU 卸载的传输量大幅降低,是长上下文场景的关键优化。 对应 vllm-ascend PR #1659 中的 mla_v1.py。 """ @@ -128,11 +128,11 @@ def _mla_preprocess( # 2. KV 压缩路径: # kv_no_split = kv_a_proj_with_mqa(hidden_states) # [num_tokens, kv_lora_rank + qk_rope_head_dim] # - # 3. 在昇腾 910C 上,使用融合算子一步完成 RMSNorm + RoPE + Cache 写入: - # kv_c_normed, k_pe_roped = npu_kv_rmsnorm_rope_cache( - # kv_no_split, cos, sin, self.kv_cache, slot_mapping, - # kv_lora_rank, qk_rope_head_dim) - # (该融合算子在 AI Core 上执行,避免中间张量的 HBM 读写) + # 3. 对 kv_c 做 RMSNorm,对 k_pe 施加 RoPE: + # kv_c, k_pe = kv_no_split.split([kv_lora_rank, qk_rope_head_dim], dim=-1) + # kv_c_normed = kv_a_layernorm(kv_c) + # k_pe_roped = rotary_emb(k_pe) + # 将 kv_c_normed 和 k_pe_roped 写入 self.kv_cache 对应 slot # # 4. Prefill 分支: # k_nope, v = kv_b_proj(kv_c_normed) 展开为完整 K_nope 和 V