Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion lmdeploy/pytorch/engine/model_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -1042,7 +1042,8 @@ def sleep(self, level: int = 1):
self.cache_engine = None
self.reset_graph_runner()
device = 'cpu' if level == 1 else 'meta'
self.patched_model.get_model().to(device=device)
self.patched_model.get_model().to(device=device, non_blocking=True)
torch.cuda.synchronize()
torch.cuda.empty_cache()

@torch.inference_mode()
Expand Down
16 changes: 16 additions & 0 deletions lmdeploy/serve/async_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,8 @@ def __init__(self,
else:
raise ValueError(f'unsupported backend {backend}')
self.backend_config = self.engine.engine_config
self.is_sleeping = backend_config.empty_init
self.sleeping_tags: set[str] = set() if not backend_config.empty_init else {'weights', 'kv_cache'}
logger.info(f'updated backend_config={self.backend_config}')

# parameters for member functions
Expand Down Expand Up @@ -477,6 +479,8 @@ def sleep(self, level: int = 1):
discard both the model weights and the kv cache.
"""
self.engine.sleep(level)
self.sleeping_tags = {'weights', 'kv_cache'}
self.is_sleeping = True

def wakeup(self, tags: Optional[List[str]] = None):
"""Wake up the model.
Expand All @@ -488,11 +492,23 @@ def wakeup(self, tags: Optional[List[str]] = None):
wake_up should be called with all tags (or None) before the
engine is used again.
"""
if tags:
for tag in tags:
if tag not in self.sleeping_tags:
logger.warning(f'tag {tag} not in sleeping tags {self.sleeping_tags}')
return
self.engine.wakeup(tags)
# for TM backend, sleep/wakeup will reset gateway, therefore we need to rebuild instance
if self.backend == 'turbomind' and (tags is None or 'kv_cache' in tags):
self.instances = [self.engine.create_instance() for _ in range(self.instance_num)]
self.free_insts = None
if tags:
for tag in tags:
self.sleeping_tags.remove(tag)
else:
self.sleeping_tags.clear()
if not self.sleeping_tags:
self.is_sleeping = False

def _get_limiter(self):
if not self.limiter:
Expand Down
6 changes: 6 additions & 0 deletions lmdeploy/serve/openai/api_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -1105,6 +1105,12 @@ async def wakeup(raw_request: Request = None):
return Response(status_code=200)


@router.get('/is_sleeping', dependencies=[Depends(check_api_key)])
async def is_sleeping(raw_request: Request = None):
is_sleeping = VariableInterface.async_engine.is_sleeping
return JSONResponse(content={'is_sleeping': is_sleeping})


""" PD Disaggregation API Begin """


Expand Down
1 change: 1 addition & 0 deletions lmdeploy/turbomind/turbomind.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ def _get_model_params(self):

model_comm = self.model_comm
tm_params = self._tm_model.tm_params
tm_params.clear()

def _get_params(device_id, que):
rank = self.node_id * self.gpu_count + device_id
Expand Down
26 changes: 14 additions & 12 deletions src/turbomind/models/llama/LlamaWeight.cc
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ void LlamaWeight::release()
}

decoder_layer_weights.clear();
pinned_weights_.clear();

// Wait for deallocations
core::Context::stream().Sync();
Expand All @@ -127,21 +128,22 @@ void LlamaWeight::release()

void LlamaWeight::to_device(const core::Device& device)
{
core::ContextGuard guard = context();

auto to_device = [&](Tensor& x) -> Tensor {
auto tmp = std::exchange(x, empty_like(x, device));
Copy(tmp, x);
return tmp;
};

std::vector<Tensor> tmp_cpu_tensors;
TM_CHECK(device.type == kCPU || device.type == kDEVICE);
core::ContextGuard guard{stream_, alloca_, Allocator{kCPUpinned}};

auto tensor_ptr_map = get_parameters();
for (auto& [name, tensor_ptr] : tensor_ptr_map) {
auto tmp_tensor = to_device(*tensor_ptr);
if (tmp_tensor.device().type != kDEVICE) {
tmp_cpu_tensors.push_back(tmp_tensor);
if (pinned_weights_.find(name) == pinned_weights_.end()) {
pinned_weights_[name] = empty_like(*tensor_ptr, kCPUpinned);
}
auto& pinned = pinned_weights_.at(name);
if (device.type == kCPU) {
Copy(*tensor_ptr, pinned);
*tensor_ptr = pinned;
}
else {
*tensor_ptr = empty_like(*tensor_ptr, kDEVICE);
Copy(pinned, *tensor_ptr);
}
}
core::Context::stream().Sync();
Expand Down
4 changes: 4 additions & 0 deletions src/turbomind/models/llama/LlamaWeight.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

#pragma once

#include <unordered_map>

#include "src/turbomind/core/context.h"
#include "src/turbomind/models/llama/LlamaDecoderLayerWeight.h"
#include "src/turbomind/models/llama/LlamaDenseWeight.h"
Expand Down Expand Up @@ -75,6 +77,8 @@ struct LlamaWeight: core::Module {
DataType data_type_;
DataType weight_type_;

std::unordered_map<std::string, Tensor> pinned_weights_;

int tp_size_; // this will follow attn tp param
int tp_rank_;

Expand Down