Skip to content

Commit 2d39f1b

Browse files
oandreeva-nvziqifan617oandreeva-nv
authored
feat: KVBM connector : enabling vectorized copy from pinned memory to device memory and vice versa (#2989)
Signed-off-by: Olga Andreeva <[email protected]> Signed-off-by: oandreeva-nv <[email protected]> Co-authored-by: Ziqi Fan <[email protected]> Co-authored-by: oandreeva-nv <[email protected]>
1 parent 5abea1b commit 2d39f1b

File tree

16 files changed

+1314
-106
lines changed

16 files changed

+1314
-106
lines changed

.gitattributes

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
*.[Pp][Nn][Gg] binary
88
*.[Zz][Ii][Pp] binary
99
*.[Tt][Gg][Zz] binary
10+
*.fatbin binary
1011

1112
# Exclude test data files from linguist language detection
1213
lib/llm/tests/data/** linguist-vendored

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

lib/bindings/python/rust/llm/block_manager/vllm/connector/leader.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ impl Leader for KvConnectorLeader {
218218
);
219219

220220
if slot.state() == SlotState::SkippedPrefill || slot.state() == SlotState::SkippedDecode {
221-
tracing::warn!("slot is in the SkippedPrefill or SkippedDecode state; will resume from skipped and return early");
221+
tracing::debug!("slot is in the SkippedPrefill or SkippedDecode state; will resume from skipped and return early");
222222
match slot.state() {
223223
SlotState::SkippedPrefill => {
224224
slot.mark_as_prefilling(self.iteration_counter)?;

lib/bindings/python/rust/llm/block_manager/vllm/connector/leader/slot.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -398,7 +398,7 @@ impl VllmConnectorSlot {
398398
SlotState::SkippedPrefill => Ok(()), // already skipped
399399
SlotState::SkippedDecode => Ok(()), // already skipped
400400
_ => {
401-
tracing::warn!("slot is in the {:?} state; will not explicitly mark as skipped, request_id: {}", self.state, self.request_id);
401+
tracing::debug!("slot is in the {:?} state; will not explicitly mark as skipped, request_id: {}", self.state, self.request_id);
402402
Ok(())
403403
}
404404
}

lib/bindings/python/src/dynamo/llm/vllm_integration/connector_worker.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,7 @@ def register_kv_caches(self, kv_caches: dict[str, torch.Tensor]):
6464
Args: kv_caches:
6565
dictionary of layer names, kv cache
6666
"""
67-
print(
68-
f"KvConnectorWorker.register_kv_caches called with {len(kv_caches)} kv_caches"
69-
)
67+
7068
cache_config = self.vllm_config.cache_config
7169

7270
# Create ordered list of (layer_name, tensor) tuples sorted by layer index

lib/llm/build.rs

Lines changed: 75 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,90 @@
11
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
22
// SPDX-License-Identifier: Apache-2.0
33

4+
use std::env;
5+
use std::path::PathBuf;
6+
47
fn main() -> Result<(), Box<dyn std::error::Error>> {
8+
// Declare our custom cfg flag to avoid unexpected_cfgs warnings
9+
println!("cargo:rustc-check-cfg=cfg(have_vec_copy_fatbin)");
10+
511
println!("cargo:warning=Building with CUDA KV off");
6-
build_protos()
12+
build_protos()?;
13+
14+
// Get FATBIN path and copy it to OUT_DIR for embedding
15+
if let Some(fatbin_path) = find_fatbin_file() {
16+
// Copy FATBIN to OUT_DIR so we can include it with a predictable path
17+
let out_dir = env::var("OUT_DIR").unwrap();
18+
let dest_path = PathBuf::from(out_dir).join("vectorized_copy.fatbin");
19+
20+
if let Err(e) = std::fs::copy(&fatbin_path, &dest_path) {
21+
println!("cargo:warning=Failed to copy FATBIN to OUT_DIR: {}", e);
22+
} else {
23+
// Emit cfg flag for conditional compilation
24+
println!("cargo:rustc-cfg=have_vec_copy_fatbin");
25+
println!(
26+
"cargo:warning=CUDA FATBIN found at: {} - copied to OUT_DIR",
27+
fatbin_path.display()
28+
);
29+
}
30+
31+
// Tell cargo to rerun if FATBIN file changes
32+
println!("cargo:rerun-if-changed={}", fatbin_path.display());
33+
} else {
34+
println!(
35+
"cargo:warning=CUDA FATBIN not found - run 'make fatbin' in cuda_kernels directory"
36+
);
37+
println!("cargo:warning=Set DYNAMO_FATBIN_PATH env var to specify custom location");
38+
}
39+
40+
// Rerun build if environment variable changes
41+
println!("cargo:rerun-if-env-changed=DYNAMO_FATBIN_PATH");
42+
43+
Ok(())
744
}
845

946
fn build_protos() -> Result<(), Box<dyn std::error::Error>> {
1047
tonic_build::compile_protos("src/grpc/protos/kserve.proto")?;
1148
Ok(())
1249
}
1350

51+
fn find_fatbin_file() -> Option<PathBuf> {
52+
// 1. Check if user specified custom path via environment variable
53+
if let Ok(custom_path) = env::var("DYNAMO_FATBIN_PATH") {
54+
let fatbin_file = PathBuf::from(custom_path);
55+
if fatbin_file.exists() {
56+
println!(
57+
"cargo:warning=Using custom FATBIN path: {}",
58+
fatbin_file.display()
59+
);
60+
return Some(fatbin_file);
61+
} else {
62+
println!(
63+
"cargo:warning=Custom FATBIN path does not exist: {}",
64+
fatbin_file.display()
65+
);
66+
}
67+
}
68+
69+
// 2. Check standard locations (priority order)
70+
let default_paths = [
71+
"./src/block_manager/block/transfer/kernels/vectorized_copy.fatbin", // Primary: Next to transfer module
72+
];
73+
74+
for path in &default_paths {
75+
let fatbin_file = PathBuf::from(path);
76+
if fatbin_file.exists() {
77+
println!(
78+
"cargo:warning=Found FATBIN at default location: {}",
79+
fatbin_file.display()
80+
);
81+
return Some(fatbin_file);
82+
}
83+
}
84+
85+
None
86+
}
87+
1488
// NOTE: Preserving this build.rs for reference. We may want to re-enable
1589
// custom kernel compilation in the future.
1690

lib/llm/src/block_manager/block/transfer.rs

Lines changed: 80 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,14 @@ use crate::block_manager::storage::{
1414
nixl::{NixlRegisterableStorage, NixlStorage},
1515
};
1616

17-
use cudarc::driver::CudaStream;
18-
1917
use nixl_sys::NixlDescriptor;
2018
use nixl_sys::XferOp::{Read, Write};
2119
use std::ops::Range;
2220
use tokio::sync::oneshot;
2321

2422
pub use crate::block_manager::storage::{CudaAccessible, Local, Remote};
2523
pub use async_trait::async_trait;
26-
pub use context::TransferContext;
24+
pub use context::{PoolConfig, TransferContext};
2725

2826
/// A block that can be the target of a write
2927
pub trait Writable {}
@@ -82,6 +80,14 @@ impl NixlTransfer {
8280
}
8381
}
8482

83+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
84+
pub enum CudaTransferMode {
85+
/// Use the custom CUDA kernel for G1 <-> G2 transfers
86+
Custom,
87+
/// Use the default CUDA async memcpy for G1 <-> G2 transfers
88+
Default,
89+
}
90+
8591
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
8692
pub enum TransferStrategy {
8793
Memcpy,
@@ -135,6 +141,33 @@ where
135141
}
136142
}
137143

144+
#[inline]
145+
fn resolve_cuda_transfer_mode(
146+
base_strategy: TransferStrategy,
147+
is_contiguous: bool,
148+
) -> CudaTransferMode {
149+
match base_strategy {
150+
TransferStrategy::CudaAsyncH2D => {
151+
if is_contiguous {
152+
CudaTransferMode::Default
153+
} else {
154+
CudaTransferMode::Custom
155+
}
156+
}
157+
TransferStrategy::CudaAsyncD2H => {
158+
if is_contiguous {
159+
CudaTransferMode::Default
160+
} else {
161+
CudaTransferMode::Custom
162+
}
163+
}
164+
other => panic!(
165+
"resolve_cuda_strategy called with non-CUDA strategy: {:?}",
166+
other
167+
),
168+
}
169+
}
170+
138171
pub fn handle_local_transfer<RB, WB>(
139172
sources: &[RB],
140173
targets: &mut [WB],
@@ -162,12 +195,51 @@ where
162195
TransferStrategy::CudaAsyncH2D
163196
| TransferStrategy::CudaAsyncD2H
164197
| TransferStrategy::CudaAsyncD2D => {
165-
for (src, dst) in sources.iter().zip(targets.iter_mut()) {
166-
cuda::copy_block(src, dst, ctx.stream().as_ref(), RB::write_to_strategy())?;
198+
tracing::debug!(
199+
"Transfer: Using CUDA strategy: {:?}",
200+
RB::write_to_strategy()
201+
);
202+
203+
if RB::write_to_strategy() == TransferStrategy::CudaAsyncH2D
204+
|| RB::write_to_strategy() == TransferStrategy::CudaAsyncD2H
205+
{
206+
let is_contiguous = sources[0].block_data().is_fully_contiguous()
207+
&& targets[0].block_data().is_fully_contiguous();
208+
let transfer_mode =
209+
resolve_cuda_transfer_mode(RB::write_to_strategy(), is_contiguous);
210+
211+
match transfer_mode {
212+
CudaTransferMode::Custom => {
213+
let selected_stream = ctx.stream();
214+
cuda::copy_blocks_with_customized_kernel(
215+
sources,
216+
targets,
217+
selected_stream.as_ref(),
218+
&ctx,
219+
)?;
220+
}
221+
CudaTransferMode::Default => {
222+
for (src, dst) in sources.iter().zip(targets.iter_mut()) {
223+
cuda::copy_block(
224+
src,
225+
dst,
226+
ctx.stream().as_ref(),
227+
RB::write_to_strategy(),
228+
)?;
229+
}
230+
}
231+
}
232+
ctx.cuda_event(tx)?;
233+
234+
Ok(rx)
235+
} else {
236+
// Fall back to individual copy for D2Dblocks
237+
for (src, dst) in sources.iter().zip(targets.iter_mut()) {
238+
cuda::copy_block(src, dst, ctx.stream().as_ref(), RB::write_to_strategy())?;
239+
}
240+
ctx.cuda_event(tx)?;
241+
Ok(rx)
167242
}
168-
169-
ctx.cuda_event(tx)?;
170-
Ok(rx)
171243
}
172244
TransferStrategy::Nixl(transfer_type) => {
173245
let transfer_fut = nixl::write_blocks_to(sources, targets, &ctx, transfer_type)?;

0 commit comments

Comments
 (0)