Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions lib/bindings/python/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions lib/bindings/python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ rand = { version = "0.9" }
socket2 = { version = "0.6" }
serde = { version = "1" }
serde_json = { version = "1.0.138" }
smallvec = { version = "1.15.1", features = ["serde"] }
thiserror = { version = "2.0" }
tokio = { version = "1.46.0", features = ["full"] }
tokio-stream = { version = "0" }
Expand Down
24 changes: 17 additions & 7 deletions lib/bindings/python/rust/llm/block_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,12 @@ impl BlockManager {
})
}

fn block_size(&self) -> usize {
self.inner.block_size()
fn engine_block_size(&self) -> usize {
self.inner.engine_block_size()
}

fn offload_block_size(&self) -> usize {
self.inner.offload_block_size()
}

fn init_controller(&mut self, component: Component) -> PyResult<()> {
Expand Down Expand Up @@ -214,14 +218,16 @@ impl BlockManager {
pub struct BlockManagerBuilder {
worker_id: u64,
leader: Option<distributed::KvbmLeader>,
page_size: usize,
offload_page_size: usize,
engine_page_size: usize,
disable_device_pool: bool,
Comment on lines +221 to 223
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

offload_page_size is never used; risk of silent misconfiguration

The builder stores offload_page_size but never propagates it into KvBlockManagerConfig. This makes the field effectively dead and prevents callers from controlling the offload size via the builder.

Apply this diff inside build() to derive and set the ratio from the two sizes (with validation):

@@
-        let model_config = dynamo_llm::block_manager::KvManagerModelConfig::builder()
+        let model_config = dynamo_llm::block_manager::KvManagerModelConfig::builder()
             .num_layers(1)
             .outer_dim(1)
-            .page_size(self.engine_page_size)
+            .page_size(self.engine_page_size)
             .inner_dim(1)
             .build()?;
 
         config = config.model(model_config);
 
+        // Derive and apply offload ratio from sizes
+        if self.engine_page_size == 0 || self.offload_page_size == 0 {
+            return Err(anyhow::anyhow!("engine/offload page sizes must be > 0"));
+        }
+        if self.offload_page_size % self.engine_page_size != 0 {
+            return Err(anyhow::anyhow!(
+                "offload_page_size ({}) must be a multiple of engine_page_size ({})",
+                self.offload_page_size,
+                self.engine_page_size
+            ));
+        }
+        let ratio = self.offload_page_size / self.engine_page_size;
+        if !ratio.is_power_of_two() {
+            return Err(anyhow::anyhow!(
+                "offload/engine page-size ratio ({}) must be a power of two",
+                ratio
+            ));
+        }
+        // Requires KvBlockManagerConfigBuilder::offload_block_size_ratio(usize)
+        config = config.offload_block_size_ratio(ratio);

Additionally, consider validating an upper bound if required by pools (e.g., offload ≤ 1024).

Committable suggestion skipped: line range outside the PR's diff.

}

impl BlockManagerBuilder {
pub fn new() -> Self {
Self {
page_size: 32, // default consistent with BlockManager::new
engine_page_size: 32, // default consistent with BlockManager::new
offload_page_size: 1024, // default consistent with BlockManager::new
..Default::default()
}
}
Expand All @@ -230,8 +236,12 @@ impl BlockManagerBuilder {
self.worker_id = id;
self
}
pub fn page_size(mut self, ps: usize) -> Self {
self.page_size = ps;
pub fn engine_page_size(mut self, ps: usize) -> Self {
self.engine_page_size = ps;
self
}
pub fn offload_page_size(mut self, ps: usize) -> Self {
self.offload_page_size = ps;
self
}
pub fn leader(mut self, l: distributed::KvbmLeader) -> Self {
Expand Down Expand Up @@ -267,7 +277,7 @@ impl BlockManagerBuilder {
let model_config = dynamo_llm::block_manager::KvManagerModelConfig::builder()
.num_layers(1)
.outer_dim(1)
.page_size(self.page_size)
.page_size(self.engine_page_size)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Only engine page size is propagated; offload is ignored

build() writes model.page_size but never configures offload_block_size_ratio, so offload size falls back to the global default regardless of the builder’s offload_page_size.

Follow the ratio derivation diff in my earlier comment to ensure offload_page_size actually takes effect.

To confirm the API path exists, run:


🏁 Script executed:

#!/bin/bash
# Verify the builder exposes offload_block_size_ratio and find its usage.
rg -n --glob 'lib/**' 'offload_block_size_ratio' -C2
# Confirm current usage of offload_page_size and engine_page_size.
rg -n --glob 'lib/**' 'offload_page_size|engine_page_size' -C2

Length of output: 38270


Propagate offload_page_size in Python binding build()
In lib/bindings/python/rust/llm/block_manager.rs around line 292, after

    .page_size(self.engine_page_size)

add

    .offload_page_size(self.offload_page_size)

so the builder applies the configured offload size rather than falling back to the default.

🤖 Prompt for AI Agents
In lib/bindings/python/rust/llm/block_manager.rs around line 292, the builder
call stops at .page_size(self.engine_page_size) and doesn't propagate the
configured offload size; add a chained
.offload_page_size(self.offload_page_size) immediately after the page_size call
so the builder uses the self.offload_page_size value (ensure the field exists
and is in scope) instead of falling back to the default.

.inner_dim(1)
.build()?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,8 @@ impl KvbmWorker {
let config = KvbmWorkerConfig::builder()
.drt(drt)
.num_device_blocks(num_device_blocks)
.page_size(page_size)
.offload_page_size(page_size)
.engine_page_size(page_size)
Comment on lines 176 to +180
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Configuration uses the same value for both page sizes

The configuration currently sets both offload_page_size and engine_page_size to the same value (page_size), which appears to contradict the PR's objective of having "the host and disk pool to use larger block sizes than the device block pool." This likely requires the Python caller to pass separate values for these parameters.

Based on the PR objectives stating that offload blocks should be larger than device blocks, the Python bindings should accept two distinct page size parameters. Consider updating the function signature to accept both engine_page_size and offload_page_size:

-    #[pyo3(signature = (num_device_blocks, page_size, tensors, device_id=0, dtype_width_bytes=2, drt=None, layout_blocking=false))]
+    #[pyo3(signature = (num_device_blocks, engine_page_size, offload_page_size, tensors, device_id=0, dtype_width_bytes=2, drt=None, layout_blocking=false))]
     fn new(
         num_device_blocks: usize,
-        page_size: usize,
+        engine_page_size: usize,
+        offload_page_size: usize,
         tensors: Vec<Py<PyAny>>,

And update the configuration accordingly:

         let config = KvbmWorkerConfig::builder()
             .drt(drt)
             .num_device_blocks(num_device_blocks)
-            .offload_page_size(page_size)
-            .engine_page_size(page_size)
+            .offload_page_size(offload_page_size)
+            .engine_page_size(engine_page_size)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let config = KvbmWorkerConfig::builder()
.drt(drt)
.num_device_blocks(num_device_blocks)
.page_size(page_size)
.offload_page_size(page_size)
.engine_page_size(page_size)
// Update the Python binding signature to accept two page sizes
#[pyo3(signature = (num_device_blocks, engine_page_size, offload_page_size, tensors, device_id=0, dtype_width_bytes=2, drt=None, layout_blocking=false))]
fn new(
num_device_blocks: usize,
engine_page_size: usize,
offload_page_size: usize,
tensors: Vec<Py<PyAny>>,
device_id: usize,
dtype_width_bytes: usize,
drt: Option<...>,
layout_blocking: bool,
) -> PyResult<Self> {
// ...
let config = KvbmWorkerConfig::builder()
.drt(drt)
.num_device_blocks(num_device_blocks)
.offload_page_size(offload_page_size)
.engine_page_size(engine_page_size)
// ...
}
🤖 Prompt for AI Agents
In lib/bindings/python/rust/llm/block_manager/distributed/worker.rs around lines
137 to 141, the code currently assigns the same page_size to both
offload_page_size and engine_page_size; update the worker API and configuration
to accept distinct engine_page_size and offload_page_size values so offload
(host/disk) can be larger than device blocks. Change the Rust function signature
(and matching Python binding) to take two separate page size parameters, use
engine_page_size for .engine_page_size(...) and offload_page_size for
.offload_page_size(...), and update all callers/tests to pass the appropriate
values. Ensure input validation (e.g., offload >= device) or clear documentation
for callers if needed.

.tensors(vllm_tensors)
.device_id(device_id)
.dtype_width_bytes(dtype_width_bytes)
Expand Down
35 changes: 20 additions & 15 deletions lib/bindings/python/rust/llm/block_manager/vllm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,10 @@ impl KvbmCacheManager {
#[new]
#[pyo3(signature = (block_manager))]
pub fn new(block_manager: PyBlockManager) -> PyResult<Self> {
let slot_manager = Mutex::new(SlotManager::new(block_manager.block_size()));
let slot_manager = Mutex::new(SlotManager::new(
block_manager.engine_block_size(),
block_manager.offload_block_size(),
));
Ok(Self {
block_manager,
slot_manager,
Expand Down Expand Up @@ -286,7 +289,7 @@ pub struct GenericSlotUpdate<R> {
pub num_new_tokens: usize,

/// The number of new computed tokens in the request.
/// The `num_new_tokens / block_size` should be equal to the length of the `new_computed_blocks`,
/// The `num_new_tokens / engine_block_size` should be equal to the length of the `new_computed_blocks`,
/// it may have a remainder for the partial block state.
/// Note: this field is solely tied to the `new_computed_blocks` field and not used when `tokens_to_append` is provided.
/// The name might be confusing, but the name matched the vLLM implementation.
Expand Down Expand Up @@ -401,15 +404,17 @@ impl SlotError {

pub struct SlotManager<R: RequestKey> {
slots: HashMap<R, Slot<DeviceStorage, Logical<DistributedLeaderWorkerResources>>>,
block_size: usize,
engine_block_size: usize,
offload_block_size: usize,
}

impl<R: RequestKey> SlotManager<R> {
/// Creates a new slot manager.
pub fn new(block_size: usize) -> Self {
pub fn new(engine_block_size: usize, offload_block_size: usize) -> Self {
Self {
slots: HashMap::new(),
block_size,
engine_block_size,
offload_block_size,
}
}

Expand All @@ -436,7 +441,7 @@ impl<R: RequestKey> SlotManager<R> {
if !self.slots.contains_key(request_id) {
self.slots.insert(
request_id.clone(),
Slot::new(tokens.into(), self.block_size, salt_hash),
Slot::new(tokens.into(), self.engine_block_size, salt_hash),
);
Comment on lines +444 to 445
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Slot initialization should carry both sizes to support offload hashing

Passing only engine_block_size makes it hard for Slot to produce offload-sized sequence hashes needed for host/disk caches.

If Slot::new can be extended, pass both sizes:

-                Slot::new(tokens.into(), self.engine_block_size, salt_hash),
+                Slot::new(
+                    tokens.into(),
+                    self.engine_block_size,
+                    self.offload_block_size,
+                    salt_hash,
+                ),

Otherwise, provide a method on Slot to compute offload-sized sequence hashes on demand.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
Slot::new(tokens.into(), self.engine_block_size, salt_hash),
);
@@ lib/bindings/python/rust/llm/block_manager/vllm.rs
Slot::new(
tokens.into(),
self.engine_block_size,
self.offload_block_size,
salt_hash,
),
);
🤖 Prompt for AI Agents
In lib/bindings/python/rust/llm/block_manager/vllm.rs around lines 432-433, the
Slot is being created with only engine_block_size which prevents Slot from
producing offload-sized sequence hashes; either extend Slot::new to accept both
sizes and change this call to pass engine_block_size and offload_block_size
(plus salt_hash), or if modifying Slot::new is undesirable, add a method on Slot
(e.g., compute_offload_sequence_hash(offload_block_size)) and call that after
construction so Slot can produce offload-sized sequence hashes for host/disk
caches.

tracing::debug!(
request_id,
Expand Down Expand Up @@ -498,7 +503,7 @@ impl<R: RequestKey> SlotManager<R> {
tracing::debug!(
request_id,
"applying {} cache-hit tokens",
blocks.len() * self.block_size
blocks.len() * self.engine_block_size
);
slot.initialize_with_device_matches(blocks)?;
}
Expand Down Expand Up @@ -566,9 +571,9 @@ impl<R: RequestKey> SlotManager<R> {
match self.slots.remove(request_id) {
Some(slot) => {
let isl = slot.num_tokens(SlotPosition::Prefill);
let isl_device = slot.num_blocks_cached_from_device() * self.block_size;
let isl_host = slot.num_blocks_cached_from_host() * self.block_size;
let isl_disk = slot.num_blocks_cached_from_disk() * self.block_size;
let isl_device = slot.num_blocks_cached_from_device() * self.engine_block_size;
let isl_host = slot.num_blocks_cached_from_host() * self.offload_block_size;
let isl_disk = slot.num_blocks_cached_from_disk() * self.offload_block_size;
tracing::info!(
request_id,
"request complete isl: {} - cache hits: device: {}, host: {}, disk: {} - prefilled: {}",
Expand Down Expand Up @@ -603,14 +608,14 @@ impl<R: RequestKey> SlotManager<R> {
assert!(num_computed_tokens <= request_num_tokens);

// early exit if we cannot match full block
if (request_num_tokens - num_computed_tokens) < self.block_size {
if (request_num_tokens - num_computed_tokens) < self.engine_block_size {
return Ok((0, false));
}

// num_computed_tokens represents the number of tokens already on the device
// this much be a multiple of the block size
let num_device_blocks = num_computed_tokens / self.block_size;
debug_assert_eq!(num_computed_tokens % self.block_size, 0);
let num_device_blocks = num_computed_tokens / self.engine_block_size;
debug_assert_eq!(num_computed_tokens % self.engine_block_size, 0);

// get the sequence hashes for the device matched tokens
let sequence_hashes = slot.sequence_hashes(SlotPosition::All);
Expand Down Expand Up @@ -661,7 +666,7 @@ impl<R: RequestKey> SlotManager<R> {
return Ok((0, false));
}

let mut num_new_matched_tokens = num_matched_blocks * self.block_size;
let mut num_new_matched_tokens = num_matched_blocks * self.engine_block_size;

// we are on a block boundary, so we need to throw away the last block
if num_computed_tokens + num_new_matched_tokens == request_num_tokens {
Expand All @@ -681,7 +686,7 @@ impl<R: RequestKey> SlotManager<R> {
}

// decrement the number of new matched tokens by the block size
num_new_matched_tokens -= self.block_size;
num_new_matched_tokens -= self.engine_block_size;
}

slot.store_onboard_blocks(host_blocks, disk_blocks);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ pub trait Leader: Send + Sync + std::fmt::Debug {
#[derive(Debug)]
pub struct KvConnectorLeader {
slot_manager: Arc<OnceLock<ConnectorSlotManager<String>>>,
block_size: usize,
engine_page_size: usize,
offload_page_size: usize,
inflight_requests: HashSet<String>,
onboarding_slots: HashSet<String>,
iteration_counter: u64,
Expand All @@ -90,7 +91,8 @@ impl KvConnectorLeader {
fn new(
worker_id: String,
drt: PyDistributedRuntime,
page_size: usize,
engine_page_size: usize,
offload_page_size: usize,
leader_py: PyKvbmLeader,
) -> Self {
tracing::info!(
Expand Down Expand Up @@ -127,7 +129,8 @@ impl KvConnectorLeader {
let block_manager = match BlockManagerBuilder::new()
.worker_id(0)
.leader(leader_py)
.page_size(page_size)
.engine_page_size(engine_page_size)
.offload_page_size(offload_page_size)
.disable_device_pool(false)
.build()
.await
Expand Down Expand Up @@ -169,7 +172,8 @@ impl KvConnectorLeader {

Self {
slot_manager: slot_manager_cell,
block_size: page_size,
engine_page_size,
offload_page_size,
inflight_requests: HashSet::new(),
onboarding_slots: HashSet::new(),
iteration_counter: 0,
Expand Down Expand Up @@ -204,7 +208,7 @@ impl Leader for KvConnectorLeader {
);

// the number of device matched tokens should be less than or equal to the number of tokens in the request
debug_assert!(num_computed_tokens % self.block_size == 0);
debug_assert!(num_computed_tokens % self.engine_page_size == 0);

let shared_slot = self.slot_manager().get_slot(&request_id)?;
let mut slot = shared_slot
Expand Down Expand Up @@ -234,7 +238,7 @@ impl Leader for KvConnectorLeader {
}

// early exit if we cannot match full block
if (slot.sequence().total_tokens() - num_computed_tokens) < self.block_size {
if (slot.sequence().total_tokens() - num_computed_tokens) < self.offload_page_size {
return Ok((0, false));
}

Expand All @@ -245,7 +249,9 @@ impl Leader for KvConnectorLeader {
// return the number of external tokens that are ready for onboarding
// we always return true here as we always asynchronously onboard matched blocks
if let SlotState::OnboardStaged(num_external_tokens) = slot.state() {
debug_assert!((num_computed_tokens + num_external_tokens) % self.block_size == 0);
debug_assert!(
(num_computed_tokens + num_external_tokens) % self.offload_page_size == 0
);
tracing::debug!(
request_id = request_id,
"scheduling onboarding for {} external tokens",
Expand Down Expand Up @@ -289,7 +295,7 @@ impl Leader for KvConnectorLeader {
// the second call will show num_external_tokens == 0
// this call is just letting us know the other blocks that are being used for the remainder of the prefill
if num_external_tokens > 0 {
let num_computed_tokens = block_ids.len() * self.block_size - num_external_tokens;
let num_computed_tokens = block_ids.len() * self.engine_page_size - num_external_tokens;
slot.record_cached_device_tokens(num_computed_tokens);
slot.advance_computed_position(num_computed_tokens)?;

Expand Down Expand Up @@ -549,11 +555,12 @@ pub struct PyKvConnectorLeader {
#[pymethods]
impl PyKvConnectorLeader {
#[new]
#[pyo3(signature = (worker_id, drt, page_size, leader))]
#[pyo3(signature = (worker_id, drt, engine_page_size, offload_page_size, leader))]
pub fn new(
worker_id: String,
drt: PyDistributedRuntime,
page_size: usize,
engine_page_size: usize,
offload_page_size: usize,
leader: PyKvbmLeader,
) -> Self {
let enable_kvbm_record = std::env::var("ENABLE_KVBM_RECORD")
Expand All @@ -562,10 +569,20 @@ impl PyKvConnectorLeader {

let connector_leader: Box<dyn Leader> = if enable_kvbm_record {
Box::new(recorder::KvConnectorLeaderRecorder::new(
worker_id, drt, page_size, leader,
worker_id,
drt,
engine_page_size,
offload_page_size,
leader,
))
} else {
Box::new(KvConnectorLeader::new(worker_id, drt, page_size, leader))
Box::new(KvConnectorLeader::new(
worker_id,
drt,
engine_page_size,
offload_page_size,
leader,
))
};
Self { connector_leader }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ impl KvConnectorLeaderRecorder {
pub fn new(
worker_id: String,
drt: PyDistributedRuntime,
page_size: usize,
engine_page_size: usize,
offload_page_size: usize,
leader_py: PyKvbmLeader,
) -> Self {
tracing::info!(
Expand Down Expand Up @@ -143,7 +144,8 @@ impl KvConnectorLeaderRecorder {
let block_manager = match BlockManagerBuilder::new()
.worker_id(0)
.leader(leader_py)
.page_size(page_size)
.engine_page_size(engine_page_size)
.offload_page_size(offload_page_size)
.disable_device_pool(false)
.build()
.await
Expand Down Expand Up @@ -185,7 +187,8 @@ impl KvConnectorLeaderRecorder {

let connector_leader = KvConnectorLeader {
slot_manager: slot_manager_cell,
block_size: page_size,
engine_page_size,
offload_page_size,
inflight_requests: HashSet::new(),
onboarding_slots: HashSet::new(),
iteration_counter: 0,
Expand Down
Loading
Loading