-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: re-allocate pages dynamically #2024
Conversation
@@ -80,9 +124,13 @@ async fn block_allocator_task( | |||
match cmd { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be unrelated, but why are we using this channel + command pattern when the routine is such a simple function? What's the advantage over just using a Mutex
or RwLock
? Current solution seems unnecessarily complicated to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the rest of the code it's because there is a lot of contention.
In the specific case of this struct there is none so I agree a Mutex would be a better idea here.
router/src/infer/v3/scheduler.rs
Outdated
.block_allocation | ||
.as_ref() | ||
.map(|alloc| (alloc.blocks.clone(), alloc.slots.clone())) | ||
.unwrap_or((Vec::new(), Vec::new())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.unwrap_or((Vec::new(), Vec::new())); | |
.unwrap_or_default(); |
router/src/infer/v3/scheduler.rs
Outdated
None | ||
} | ||
}) | ||
.unwrap_or(None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
map
+ unwrap_or(None)
is called and_then
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes all this code will be refactored it's just something I did yesterday evening to test the idea.
The PR should be marked as draft.
router/src/infer/v3/scheduler.rs
Outdated
}) | ||
.collect(); | ||
|
||
for id in ids.iter() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for id in ids.iter() { | |
for id in &ids { |
router/src/infer/v3/scheduler.rs
Outdated
tracing::error!("{err}"); | ||
|
||
// unwrap_or is valid here as we don't care if the receiver is gone. | ||
entry.response_tx.send(Err(err)).unwrap_or(()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the intent is just ignore the return value, you can use an empty binding.
entry.response_tx.send(Err(err)).unwrap_or(()); | |
let _ = entry.response_tx.send(Err(err)); |
@@ -201,6 +195,9 @@ def from_tokenized( | |||
input_length = len(tokenized_input) | |||
input_lengths.append(input_length) | |||
|
|||
speculative_length = get_speculate() | |||
speculative_length = 0 if speculative_length is None else speculative_length |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
speculative_length = 0 if speculative_length is None else speculative_length | |
speculative_length = get_speculate() or 0 |
} | ||
|
||
pub(crate) async fn extend(&mut self, current_length: u32) -> Result<(), AllocationError> { | ||
let remaining_tokens = max(self.prompt_tokens + self.decode_tokens - current_length, 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why a minimum of 1 instead of just returning? And since you are using u32
, the subtraction might overflow and get a very big result. I suggest using signed integer for any numeric calculation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a quick hack but it will not be present in the final code.
response_sender, | ||
} => { | ||
let decode_tokens = min(decode_tokens, block_size); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the rationale for this?
@zirconium-n Thanks for your input. Can you provide a bit of background on yourself ? Who are you and how are you trying to help here ? Your comment definitely seem on point on some aspects but it feels very off on our side to have someone we have no connection with, barge in and comment code authoritatively like you are doing. Starting with introduction on where you come from and what's your goal will go a long way with us replying in a positive manner. |
proto/v3/generate.proto
Outdated
@@ -198,13 +198,24 @@ message Generation { | |||
optional GeneratedText generated_text = 4; | |||
/// Top tokens | |||
repeated Tokens top_tokens = 5; | |||
/// Current length of the request: prompt tokens + number of generated tokens until this point | |||
uint32 current_length = 6; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be cached_tokens
instead
proto/v3/generate.proto
Outdated
} | ||
|
||
message FilterBatchRequest { | ||
/// Batch ID | ||
uint64 batch_id = 1; | ||
/// Requests to keep | ||
repeated uint64 request_ids = 2; | ||
repeated UpdatedRequest updated_requests = 2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Narsil, not particularly happy about this name. Do you have a better idea?
The way it works is that we send a list of requests <= current requests in batch.
The requests that are not part of this list are dropped from the cached batch.
We take the blocks and slots in this request as truth and reallocate the slots and block_table tensors in the cached batch (allow for updating blocks and slots).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
KeptRequests
?
s/blocks/new_blocks/
? (They are only defined when new blocks are being allocated right ?)
} | ||
|
||
pub(crate) async fn extend(&mut self, current_length: u32) -> Result<(), AllocationError> { | ||
let remaining_tokens = max(self.prompt_tokens + self.decode_tokens - current_length, 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
current_length is false here, we need to use cached_tokens
instead.
This still works because of the max(1) but we should remove it.
response_sender, | ||
} => { | ||
let decode_tokens = min(decode_tokens, block_size); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Allocate prompt tokens + min(decode_tokens, block_size).
So prompt tokens + 1 or 2 blocks.
@@ -80,9 +124,13 @@ async fn block_allocator_task( | |||
match cmd { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the rest of the code it's because there is a lot of contention.
In the specific case of this struct there is none so I agree a Mutex would be a better idea here.
router/src/infer/v3/scheduler.rs
Outdated
None | ||
} | ||
}) | ||
.unwrap_or(None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes all this code will be refactored it's just something I did yesterday evening to test the idea.
The PR should be marked as draft.
} | ||
|
||
pub(crate) async fn extend(&mut self, current_length: u32) -> Result<(), AllocationError> { | ||
let remaining_tokens = max(self.prompt_tokens + self.decode_tokens - current_length, 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a quick hack but it will not be present in the final code.
Ah. Sorry if the comments bothered you. I'm playing with a fork of this repo myself and is messing with this particular part of code recently (and maybe eventually open a PR myself). I noticed there are changes happening on upstream and want to keep up with the latest changes. So I thought I might as well provide some help. By no means I'm trying to be rude or sound authoritative, just trying to provide some ergonomic nits and ask some questions. I will not engage further if this is unwanted. |
No this is fine, you can continue, just bear in mind that we might not know all this beforehand :). As for the core of the changes here, it's about becoming optimistic in memory allocation (instead of the current pessimistic approach). So allocating all possible memory for a give request vs allocating later and having to deal with potential OOM situations. |
0dbd6b3
to
73c3903
Compare
I think this could be interesting especially in context of this pr https://buildkite.com/vllm/performance-benchmark/builds/4068 |
Today we allocate all pages at once when first scheduling the request.
This can lead to under-utilisation as a lot of requests terminate by eos token before reaching max new tokens (see p50 in prod).
This PR re-allocates pages dynamically each time a request finishes a page.
Benchmark = share gpt with max_new_tokens forced at 2048 tokens.
Queue time is greatly improved (p99 32s => 1.34s) and throughput is multiplied by > 2 (1.49 => 3.37)