Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -182,4 +182,8 @@ tools/addlicense
tools/.addlicense.lock

.vscode/*
.claude
.claude

# macOS metadata files
.DS_Store
._*
17 changes: 10 additions & 7 deletions csrc/allocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ void FTensorAllocator::init(const std::string &dev_str, size_t page_size,
bool contiguous_layout) {
std::lock_guard<std::mutex> lock(g_allocator_mutex_);
if (!g_allocators_.empty()) {
LOGE("FTensorAllocator has been initialized. Re-initializing...")
LOGGER(ERROR, "FTensorAllocator has been initialized. Re-initializing...");
g_allocators_.clear();
}

Expand All @@ -74,8 +74,10 @@ void FTensorAllocator::init(const std::string &dev_str, size_t page_size,
// Validate that page_size is a multiple of 2MB
size_t base_size = 2 * 1024 * 1024; // 2MB
if (page_size % base_size != 0) {
LOGE("Invalid page size: %zu, must be a multiple of 2MB (2097152 bytes)",
page_size);
LOGGER(
ERROR,
"Invalid page size: %zu, must be a multiple of 2MB (2097152 bytes)",
page_size);
abort();
}
kPageSize = page_size;
Expand Down Expand Up @@ -120,8 +122,8 @@ std::vector<torch::Tensor> FTensorAllocator::create_kv_tensors(
size_t aligned_size = size;
if (size % kPageSize != 0) {
aligned_size = ((size + kPageSize - 1) / kPageSize) * kPageSize;
LOGW("Size %zu is not aligned to page size %zu, aligning to %zu", size,
kPageSize, aligned_size);
LOGGER(WARNING, "Size %zu is not aligned to page size %zu, aligning to %zu",
size, kPageSize, aligned_size);
}
kv_tensor_size_per_layer_ = aligned_size;

Expand Down Expand Up @@ -151,7 +153,7 @@ bool FTensorAllocator::kv_tensors_created() {
bool FTensorAllocator::map_to_kv_tensors(const std::vector<offset_t> &offsets) {
std::unique_lock<std::mutex> lock(mtx_);
if (num_layers_ == 0) {
LOGE("try to map to KV tensors when KV tensors are not created");
LOGGER(ERROR, "try to map to KV tensors when KV tensors are not created");
return false;
}

Expand Down Expand Up @@ -202,7 +204,8 @@ bool FTensorAllocator::unmap_from_kv_tensors(
const std::vector<offset_t> &offsets) {
std::unique_lock<std::mutex> lock(mtx_);
if (num_layers_ == 0) {
LOGE("try to unmap from KV tensors when KV tensors are not created");
LOGGER(ERROR,
"try to unmap from KV tensors when KV tensors are not created");
return false;
}

Expand Down
4 changes: 2 additions & 2 deletions csrc/ftensor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ bool FTensor::map(offset_t offset) {

page_id_t page_id = offset / page_size_;
if (mapping_.find(page_id) != mapping_.end()) {
LOGE("Page %ld is already mapped.", page_id);
LOGGER(ERROR, "Page %ld is already mapped.", page_id);
return false;
}

Expand All @@ -93,7 +93,7 @@ bool FTensor::unmap(offset_t offset) {

page_id_t page_id = offset / page_size_;
if (mapping_.find(page_id) == mapping_.end()) {
LOGE("Page %ld is not mapped.", page_id);
LOGGER(ERROR, "Page %ld is not mapped.", page_id);
return false;
}

Expand Down
64 changes: 64 additions & 0 deletions csrc/inc/cuda_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,70 @@

#include <cassert>
#include <iostream>
#include <sys/syscall.h>
#include <time.h>
#include <unistd.h>

typedef enum {
FATAL = 0,
ERROR = 1,
WARNING = 2,
INFO = 3,
DEBUG = 4,
VERBOSE = 5,
} log_level_enum_t;

extern void now_to_string(char *buf, int length);
#ifdef __cplusplus
__attribute__((unused)) static char *logger_level_str[] = {
(char *)"FATAL", (char *)"ERROR", (char *)"WARNING",
(char *)"INFO", (char *)"DEBUG", (char *)"VERBOSE"};
#else
__attribute__((unused)) static char *logger_level_str[] = {
"FATAL", "ERROR", "WARNING", "INFO", "DEBUG", "VERBOSE"};
#endif

// glibc >= 2.30 provides a native gettid() wrapper; only define our own
// syscall-based version on older systems to avoid macro/function conflicts.
#if !defined(__GLIBC__) || !defined(__GLIBC_MINOR__) || (__GLIBC__ < 2) || \
(__GLIBC__ == 2 && __GLIBC_MINOR__ < 30)
#ifndef SYS_gettid
#error "SYS_gettid unavailable on this system"
#endif
static inline pid_t gettid(void) { return (pid_t)syscall(SYS_gettid); }
#endif

#define LOGGER(level, format, ...) \
({ \
char *_print_level_str = getenv("KVCACHED_LOG_LEVEL"); \
char time[64]; \
now_to_string(time, 64); \
int _print_level = 0; \
if (_print_level_str == NULL) { \
_print_level = WARNING; \
} else if (_print_level_str[0] == 'F') { \
_print_level = FATAL; \
} else if (_print_level_str[0] == 'E') { \
_print_level = ERROR; \
} else if (_print_level_str[0] == 'W') { \
_print_level = WARNING; \
} else if (_print_level_str[0] == 'I') { \
_print_level = INFO; \
} else if (_print_level_str[0] == 'D') { \
_print_level = DEBUG; \
} else if (_print_level_str[0] == 'V') { \
_print_level = VERBOSE; \
} \
if (level <= _print_level) { \
fprintf(stderr, \
"[KVCACHED_MEMORY_POOL][%s][%s]%s:%d [p:%u t:%u]" format "\n", \
logger_level_str[level], time, __FILE__, __LINE__, \
(unsigned int)getpid(), (unsigned int)gettid(), ##__VA_ARGS__); \
} \
if (level == FATAL) { \
exit(-1); \
} \
})

#define LOGE(format, ...) \
fprintf(stderr, "ERROR: %s:%d: " format "\n", __FILE__, __LINE__, \
Expand Down
246 changes: 246 additions & 0 deletions csrc/inc/mem_info_tracker.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
// SPDX-FileCopyrightText: Copyright contributors to the kvcached project
// SPDX-License-Identifier: Apache-2.0

#pragma once

#include <cstdint>
#include <cstring>
#include <fcntl.h>
#include <signal.h>
#include <string>
#include <sys/file.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <unistd.h>

#include "cuda_utils.hpp"

namespace kvcached {

static constexpr const char *SHM_DIR = "/dev/shm";

// Memory info struct stored in shared memory, compatible with Python
// MemInfoStruct. Layout: [total_size(int64), used_size(int64),
// prealloc_size(int64)]
struct MemInfoStruct {
int64_t total_size;
int64_t used_size;
int64_t prealloc_size;

static constexpr int N_FIELDS = 3;
static constexpr size_t SHM_SIZE = sizeof(int64_t) * N_FIELDS;

MemInfoStruct() : total_size(0), used_size(0), prealloc_size(0) {}
MemInfoStruct(int64_t total, int64_t used, int64_t prealloc)
: total_size(total), used_size(used), prealloc_size(prealloc) {}
};

// RAII class for file-lock + mmap operations on /dev/shm files
class RwLockedShm {
public:
enum LockType { RLOCK = LOCK_SH, WLOCK = LOCK_EX };

RwLockedShm(const std::string &file_path, size_t size, LockType lock_type)
: file_path_(get_ipc_path(file_path)), size_(size), lock_type_(lock_type),
fd_(-1), mapped_(nullptr) {}

~RwLockedShm() { close(); }

// Open and lock the shared memory file, returns whether successful
bool open() {
// Try to open the file
fd_ = ::open(file_path_.c_str(), O_RDWR);
if (fd_ < 0) {
if (lock_type_ != WLOCK) {
return false;
}
// Create file in write-lock mode
fd_ = ::open(file_path_.c_str(), O_RDWR | O_CREAT, 0666);
if (fd_ < 0) {
return false;
}
if (ftruncate(fd_, size_) < 0) {
::close(fd_);
fd_ = -1;
return false;
}
}

// Ensure file size is sufficient
struct stat st;
if (fstat(fd_, &st) == 0 && static_cast<size_t>(st.st_size) < size_) {
if (lock_type_ == WLOCK) {
(void)ftruncate(fd_, size_);
}
}

// Acquire file lock
if (flock(fd_, lock_type_) < 0) {
::close(fd_);
fd_ = -1;
return false;
}

// mmap
int prot = (lock_type_ == WLOCK) ? (PROT_READ | PROT_WRITE) : PROT_READ;
mapped_ = mmap(nullptr, size_, prot, MAP_SHARED, fd_, 0);
if (mapped_ == MAP_FAILED) {
mapped_ = nullptr;
flock(fd_, LOCK_UN);
::close(fd_);
fd_ = -1;
return false;
}

return true;
}

void close() {
if (mapped_ != nullptr) {
munmap(mapped_, size_);
mapped_ = nullptr;
}
if (fd_ >= 0) {
flock(fd_, LOCK_UN);
::close(fd_);
fd_ = -1;
}
}

void *data() { return mapped_; }
const void *data() const { return mapped_; }

// Read MemInfoStruct from mmap buffer
MemInfoStruct read_mem_info() const {
MemInfoStruct info;
if (mapped_) {
const int64_t *arr = static_cast<const int64_t *>(mapped_);
info.total_size = arr[0];
info.used_size = arr[1];
info.prealloc_size = arr[2];
}
return info;
}

// Write MemInfoStruct to mmap buffer
void write_mem_info(const MemInfoStruct &info) {
if (mapped_) {
int64_t *arr = static_cast<int64_t *>(mapped_);
arr[0] = info.total_size;
arr[1] = info.used_size;
arr[2] = info.prealloc_size;
}
}

private:
static std::string get_ipc_path(const std::string &name) {
if (name.empty())
return "";
if (name[0] == '/')
return name;
return std::string(SHM_DIR) + "/" + name;
}

std::string file_path_;
size_t size_;
LockType lock_type_;
int fd_;
void *mapped_;
};

// MemInfoTracker: tracks memory usage info via POSIX shared memory
class MemInfoTracker {
public:
explicit MemInfoTracker(int64_t total_mem_size, int64_t group_id = 0,
const std::string &ipc_name = "")
: ipc_name_(ipc_name), total_mem_size_(total_mem_size) {
if (ipc_name_.empty()) {
std::string base = obtain_default_ipc_name();
// Non-zero group_id gets a "_g<id>" suffix so multiple pools
// in one process don't share a segment.
if (group_id != 0) {
base += "_g" + std::to_string(group_id);
}
ipc_name_ = base;
}
init_kv_cache_limit(total_mem_size_);
LOGGER(INFO,
"MemInfoTracker initialized: ipc_name=%s, total_mem_size=%ld, "
"group_id=%ld",
ipc_name_.c_str(), total_mem_size_, group_id);
}

~MemInfoTracker() { cleanup(); }

// Update memory usage info in shared memory
void update_memory_usage(int64_t used_size, int64_t prealloc_size) {
RwLockedShm shm(ipc_name_, MemInfoStruct::SHM_SIZE, RwLockedShm::WLOCK);
if (!shm.open()) {
LOGGER(ERROR, "MemInfoTracker: failed to open shm for update: %s",
ipc_name_.c_str());
return;
}
MemInfoStruct info = shm.read_mem_info();
info.used_size = used_size;
info.prealloc_size = prealloc_size;
shm.write_mem_info(info);
}

// Check if resize is needed, returns new mem_size (per layer), or -1 if not
// needed
int64_t check_and_get_resize_target(int64_t current_mem_size,
int64_t num_layers,
int64_t num_kv_buffers = 2) {
RwLockedShm shm(ipc_name_, MemInfoStruct::SHM_SIZE, RwLockedShm::RLOCK);
if (!shm.open()) {
return -1;
}
MemInfoStruct info = shm.read_mem_info();
int64_t new_mem_size = info.total_size / num_layers / num_kv_buffers;
if (new_mem_size != current_mem_size) {
return new_mem_size;
}
return -1;
}

const std::string &get_ipc_name() const { return ipc_name_; }

private:
// Initialize kv cache limit in shared memory
void init_kv_cache_limit(int64_t kv_cache_limit) {
RwLockedShm shm(ipc_name_, MemInfoStruct::SHM_SIZE, RwLockedShm::WLOCK);
if (!shm.open()) {
LOGGER(ERROR, "MemInfoTracker: failed to create shm: %s",
ipc_name_.c_str());
return;
}
MemInfoStruct info(kv_cache_limit, 0, 0);
shm.write_mem_info(info);
}

// Cleanup shared memory
void cleanup() {
std::string path = std::string(SHM_DIR) + "/" + ipc_name_;
::unlink(path.c_str());
}

// Get default IPC name (consistent with Python version logic)
static std::string obtain_default_ipc_name() {
// Prefer environment variable
const char *env_name = std::getenv("KVCACHED_IPC_NAME");
if (env_name && env_name[0] != '\0') {
return std::string(env_name);
}

// Construct name using pgid
pid_t pgid = getpgrp();
char buf[256];
snprintf(buf, sizeof(buf), "kvcached_engine_%d", static_cast<int>(pgid));
return std::string(buf);
}

std::string ipc_name_;
int64_t total_mem_size_;
};

} // namespace kvcached
Loading
Loading