diff --git a/README.md b/README.md index 7fd7a0df..7af3eae4 100644 --- a/README.md +++ b/README.md @@ -67,6 +67,9 @@ With 40 GB of data (equivalent to the size of the KVCache generated by 128k toke P2P Store is built on the Transfer Engine and supports sharing temporary objects between peer nodes in a cluster. P2P Store is ideal for scenarios like checkpoint transfer, where data needs to be rapidly and efficiently shared across a cluster. **P2P Store has been used in the checkpoint transfer service of Moonshot AI.** +### Mooncake Store ([Guide](doc/en/mooncake-store-preview.md)) +Mooncake Store is a distributed KVCache storage engine specialized for LLM inference. It offers object-level APIs (`Put`, `Get` and `Remove`), and we will soon release an new vLLM integration to demonstrate xPyD disaggregation. Mooncake Store is the central component of the KVCache-centric disaggregated architecture. + #### Highlights - **Decentralized architecture.** P2P Store leverages a pure client-side architecture with global metadata managed by the etcd service. diff --git a/mooncake-p2p-store/build.sh b/mooncake-p2p-store/build.sh index 0549f867..4a60c902 100644 --- a/mooncake-p2p-store/build.sh +++ b/mooncake-p2p-store/build.sh @@ -34,8 +34,9 @@ fi # fi EXT_LDFLAGS="-L$PROJECT_ROOT_DIRECTORY/build/mooncake-transfer-engine/src" +EXT_LDFLAGS+=" -L$PROJECT_ROOT_DIRECTORY/build/mooncake-transfer-engine/src/common/base" EXT_LDFLAGS+=" -L$PROJECT_ROOT_DIRECTORY/thirdparties/lib" -EXT_LDFLAGS+=" -ltransfer_engine -lstdc++ -lnuma -lglog -libverbs -ljsoncpp -letcd-cpp-api -lprotobuf -lgrpc++ -lgrpc" +EXT_LDFLAGS+=" -ltransfer_engine -lbase -lstdc++ -lnuma -lglog -libverbs -ljsoncpp -letcd-cpp-api -lprotobuf -lgrpc++ -lgrpc -lboost_thread" # EXT_LDFLAGS+=" -lhiredis" // if USE_REDIS is enabled # EXT_LDFLAGS+=" -lcurl" // if USE_HTTP is enabled diff --git a/mooncake-store/src/client.cpp b/mooncake-store/src/client.cpp index 000085f4..96e89bdd 100644 --- a/mooncake-store/src/client.cpp +++ b/mooncake-store/src/client.cpp @@ -80,6 +80,10 @@ ErrorCode Client::InitTransferEngine(const std::string& local_hostname, } CHECK(transport) << "Failed to install transport"; + LOG(INFO) << "transport_type=shm"; + transport = transfer_engine_->installTransport("shm", protocol_args); + CHECK(transport) << "Failed to install transport"; + return ErrorCode::OK; } diff --git a/mooncake-store/tests/distributed_object_store_provider.py b/mooncake-store/tests/distributed_object_store_provider.py index be0d8633..21bd8398 100644 --- a/mooncake-store/tests/distributed_object_store_provider.py +++ b/mooncake-store/tests/distributed_object_store_provider.py @@ -1,18 +1,18 @@ import os import time -from distributed_object_store import distributed_object_store +from mooncake_vllm_adaptor import MooncakeDistributedStore def startProvider(): # Initialize the store - store = distributed_object_store() + store = MooncakeDistributedStore() # Use TCP protocol by default for testing - protocol = os.getenv("PROTOCOL", "tcp") + protocol = os.getenv("PROTOCOL", "rdma") device_name = os.getenv("DEVICE_NAME", "ibp6s0") - local_hostname = os.getenv("LOCAL_HOSTNAME", "localhost:12355") - metadata_server = os.getenv("METADATA_ADDR", "127.0.0.1:2379") + local_hostname = os.getenv("LOCAL_HOSTNAME", "10.1.100.3:12355") + metadata_server = os.getenv("METADATA_ADDR", "10.1.101.3:2379") global_segment_size = 3200 * 1024 * 1024 local_buffer_size = 512 * 1024 * 1024 - master_server_address = os.getenv("MASTER_SERVER", "127.0.0.1:50051") + master_server_address = os.getenv("MASTER_SERVER", "10.1.101.3:50051") retcode = store.setup(local_hostname, metadata_server, global_segment_size, diff --git a/mooncake-store/tests/stress_cluster_benchmark.py b/mooncake-store/tests/stress_cluster_benchmark.py index 3e8a3485..2c14832b 100644 --- a/mooncake-store/tests/stress_cluster_benchmark.py +++ b/mooncake-store/tests/stress_cluster_benchmark.py @@ -34,34 +34,43 @@ def setup(cls): def prefill(cls): index = 0 + start = time.time() value = os.urandom(cls.value_length) while index < cls.max_requests: key = "k_" + str(index) - value = bytes([index % 256] * cls.value_length) + # value = bytes([index % 256] * cls.value_length) retcode = cls.store.put(key, value) if retcode: print("WARNING: put failed, key", key) if random.randint(0, 100) < 98: index = index + 1 if index % 500 == 0: - print("completed", index, "entries"); - time.sleep(20) # wait for decode + print("completed", index, "entries") + end = time.time() + print("elapsed", end - start) + print("bandwidth", cls.value_length * cls.max_requests / (end - start)) + time.sleep(5) # wait for decode def decode(cls): index = 0 + start = time.time() while index < cls.max_requests: key = "k_" + str(index) value = cls.store.get(key) if len(value) == 0: print("WARNING: get failed, key", key) else: - expected_value = bytes([index % 256] * cls.value_length) - if value != expected_value: - print("WARNING: get data corrupted, key", key) + pass + # expected_value = bytes([index % 256] * cls.value_length) + # if value != expected_value: + # print("WARNING: get data corrupted, key", key) index = index + 1 if index % 500 == 0: print("completed", index, "entries"); - time.sleep(20) # wait for decode + end = time.time() + print("elapsed", end - start) + print("bandwidth", cls.value_length * cls.max_requests / (end - start)) + time.sleep(5) # wait for decode if __name__ == '__main__': diff --git a/mooncake-store/tests/stress_workload_test.cpp b/mooncake-store/tests/stress_workload_test.cpp index 28048732..03cec337 100644 --- a/mooncake-store/tests/stress_workload_test.cpp +++ b/mooncake-store/tests/stress_workload_test.cpp @@ -89,6 +89,9 @@ class RandomGen { } }; +const std::string kLocalHostName = "10.1.100.3:12355"; +const std::string kMetadataServerAddress = "10.1.101.3:2379"; + class ClientIntegrationTest : public ::testing::Test { protected: static void SetUpTestSuite() { @@ -114,7 +117,7 @@ class ClientIntegrationTest : public ::testing::Test { const size_t ram_buffer_size = 3200ull * 1024 * 1024; segment_ptr_ = allocate_buffer_allocator_memory(ram_buffer_size); ASSERT_TRUE(segment_ptr_); - ErrorCode rc = client_->MountSegment("localhost:12345", segment_ptr_, + ErrorCode rc = client_->MountSegment(kLocalHostName, segment_ptr_, ram_buffer_size); if (rc != ErrorCode::OK) { LOG(ERROR) << "Failed to mount segment: " << toString(rc); @@ -122,7 +125,7 @@ class ClientIntegrationTest : public ::testing::Test { } static void CleanupSegment() { - if (client_->UnmountSegment("localhost:12345", segment_ptr_) != + if (client_->UnmountSegment(kLocalHostName, segment_ptr_) != ErrorCode::OK) { LOG(ERROR) << "Failed to unmount segment"; } @@ -132,11 +135,11 @@ class ClientIntegrationTest : public ::testing::Test { client_ = std::make_unique(); void** args = (FLAGS_protocol == "rdma") ? rdma_args(FLAGS_device_name) : nullptr; - ASSERT_EQ(client_->Init("localhost:12345", // Local hostname - "127.0.0.1:2379", // Metadata connection string + ASSERT_EQ(client_->Init(kLocalHostName, // Local hostname + kMetadataServerAddress, // Metadata connection string FLAGS_protocol, args, FLAGS_master_address), ErrorCode::OK); - auto client_buffer_allocator_size = 128 * 1024 * 1024; + auto client_buffer_allocator_size = 256 * 1024 * 1024; client_buffer_allocator_ = std::make_unique(client_buffer_allocator_size); ErrorCode rc = client_->RegisterLocalMemory( @@ -172,7 +175,7 @@ TEST_F(ClientIntegrationTest, StressPutOperations) { pthread_barrier_init(&barrier, nullptr, kThreads + 1); std::vector runner_list; const int rand_len = 100; - const int value_length = kMaxSliceSize; + const int value_length = 2 * 1024 * 1024; for (int i = 0; i < kThreads; ++i) { runner_list.push_back(std::thread([&]() { @@ -195,7 +198,6 @@ TEST_F(ClientIntegrationTest, StressPutOperations) { ErrorCode::OK); ASSERT_EQ(client_->Get(entry.first.data(), slices), ErrorCode::OK); - ASSERT_EQ(client_->Remove(entry.first.data()), ErrorCode::OK); } pthread_barrier_wait(&barrier); client_buffer_allocator_->deallocate(write_buffer, value_length); diff --git a/mooncake-transfer-engine/include/error.h b/mooncake-transfer-engine/include/error.h index af1e5015..1529a8f6 100644 --- a/mooncake-transfer-engine/include/error.h +++ b/mooncake-transfer-engine/include/error.h @@ -21,6 +21,7 @@ #define ERR_BATCH_BUSY (-4) #define ERR_DEVICE_NOT_FOUND (-6) #define ERR_ADDRESS_OVERLAPPED (-7) +#define ERR_NOT_LOCAL_SEGMENT (-8) #define ERR_DNS (-101) #define ERR_SOCKET (-102) diff --git a/mooncake-transfer-engine/include/transport/shm_transport/shm_transport.h b/mooncake-transfer-engine/include/transport/shm_transport/shm_transport.h new file mode 100644 index 00000000..90f5132b --- /dev/null +++ b/mooncake-transfer-engine/include/transport/shm_transport/shm_transport.h @@ -0,0 +1,112 @@ +// Copyright 2025 KVCache.AI +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef SHM_TRANSPORT_H_ +#define SHM_TRANSPORT_H_ + +#include +#include +#include +#include +#include + +#include "transfer_metadata.h" +#include "transport/transport.h" + +namespace mooncake { +class TransferMetadata; + +class ThreadPool { + public: + ThreadPool(size_t threadCount) + : ioService_(), + work_(boost::asio::make_work_guard(ioService_)), + stopped_(false) { + for (size_t i = 0; i < threadCount; ++i) { + threads_.create_thread( + boost::bind(&boost::asio::io_service::run, &ioService_)); + } + } + + ~ThreadPool() { stop(); } + + void submit(std::function task) { + ioService_.post(std::move(task)); + } + + void stop() { + if (!stopped_) { + stopped_ = true; + ioService_.stop(); + threads_.join_all(); + } + } + + private: + boost::asio::io_service ioService_; + boost::asio::executor_work_guard + work_; // 保持 io_service 运行 + boost::thread_group threads_; + bool stopped_; +}; + +class ShmTransport : public Transport { + public: + using BufferDesc = TransferMetadata::BufferDesc; + using SegmentDesc = TransferMetadata::SegmentDesc; + + public: + ShmTransport(); + + ~ShmTransport(); + + Status submitTransfer(BatchID batch_id, + const std::vector &entries) override; + + Status submitTransferTask( + const std::vector &request_list, + const std::vector &task_list) override; + + Status getTransferStatus(BatchID batch_id, size_t task_id, + TransferStatus &status) override; + + private: + int install(std::string &local_server_name, + std::shared_ptr meta, + std::shared_ptr topo); + + void startTransfer(Slice *slice); + + int registerLocalMemory(void *addr, size_t length, + const std::string &location, bool remote_accessible, + bool update_metadata); + + int unregisterLocalMemory(void *addr, bool update_metadata = false); + + int registerLocalMemoryBatch( + const std::vector &buffer_list, + const std::string &location); + + int unregisterLocalMemoryBatch( + const std::vector &addr_list) override; + + const char *getName() const override { return "shm"; } + + private: + std::atomic_bool running_; + ThreadPool thread_pool_; +}; +} // namespace mooncake + +#endif \ No newline at end of file diff --git a/mooncake-transfer-engine/src/CMakeLists.txt b/mooncake-transfer-engine/src/CMakeLists.txt index dbce84f8..6af5dcc3 100644 --- a/mooncake-transfer-engine/src/CMakeLists.txt +++ b/mooncake-transfer-engine/src/CMakeLists.txt @@ -25,10 +25,10 @@ if (USE_HTTP) target_link_libraries(transfer_engine PUBLIC ${CURL_LIBRARIES}) endif() target_link_libraries( - transfer_engine - PUBLIC - base transport rdma_transport ibverbs glog gflags pthread jsoncpp numa - ) + transfer_engine + PUBLIC + base transport rdma_transport ibverbs glog gflags pthread jsoncpp numa boost_thread +) if (USE_CUDA) target_include_directories(transfer_engine PRIVATE /usr/local/cuda/include) diff --git a/mooncake-transfer-engine/src/multi_transport.cpp b/mooncake-transfer-engine/src/multi_transport.cpp index 41523bf3..5f3b1e65 100644 --- a/mooncake-transfer-engine/src/multi_transport.cpp +++ b/mooncake-transfer-engine/src/multi_transport.cpp @@ -16,6 +16,7 @@ #include "transport/rdma_transport/rdma_transport.h" #include "transport/tcp_transport/tcp_transport.h" +#include "transport/shm_transport/shm_transport.h" #include "transport/transport.h" #ifdef USE_NVMEOF #include "transport/nvmeof_transport/nvmeof_transport.h" @@ -137,6 +138,8 @@ Transport *MultiTransport::installTransport(const std::string &proto, transport = new RdmaTransport(); } else if (std::string(proto) == "tcp") { transport = new TcpTransport(); + } else if (std::string(proto) == "shm") { + transport = new ShmTransport(); } #ifdef USE_NVMEOF else if (std::string(proto) == "nvmeof") { @@ -159,8 +162,8 @@ Transport *MultiTransport::installTransport(const std::string &proto, } Transport *MultiTransport::selectTransport(const TransferRequest &entry) { - if (entry.target_id == LOCAL_SEGMENT_ID && transport_map_.count("local")) - return transport_map_["local"].get(); + if (entry.target_id == LOCAL_SEGMENT_ID && transport_map_.count("shm")) + return transport_map_["shm"].get(); auto target_segment_desc = metadata_->getSegmentDescByID(entry.target_id); if (!target_segment_desc) { LOG(ERROR) << "MultiTransport: Incorrect target segment id " diff --git a/mooncake-transfer-engine/src/transport/CMakeLists.txt b/mooncake-transfer-engine/src/transport/CMakeLists.txt index b0938da4..faf7fd7f 100644 --- a/mooncake-transfer-engine/src/transport/CMakeLists.txt +++ b/mooncake-transfer-engine/src/transport/CMakeLists.txt @@ -6,6 +6,9 @@ add_library(transport OBJECT ${XPORT_SOURCES} $) add_subdirectory(tcp_transport) target_sources(transport PUBLIC $) +add_subdirectory(shm_transport) +target_sources(transport PUBLIC $) + if (USE_NVMEOF) add_subdirectory(nvmeof_transport) target_sources(transport PUBLIC $) diff --git a/mooncake-transfer-engine/src/transport/shm_transport/CMakeLists.txt b/mooncake-transfer-engine/src/transport/shm_transport/CMakeLists.txt new file mode 100644 index 00000000..611f9ec1 --- /dev/null +++ b/mooncake-transfer-engine/src/transport/shm_transport/CMakeLists.txt @@ -0,0 +1,3 @@ +file(GLOB SHM_SOURCES "*.cpp") + +add_library(shm_transport OBJECT ${SHM_SOURCES}) diff --git a/mooncake-transfer-engine/src/transport/shm_transport/shm_transport.cpp b/mooncake-transfer-engine/src/transport/shm_transport/shm_transport.cpp new file mode 100644 index 00000000..c46e33f6 --- /dev/null +++ b/mooncake-transfer-engine/src/transport/shm_transport/shm_transport.cpp @@ -0,0 +1,174 @@ +// Copyright 2024 KVCache.AI +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "transport/shm_transport/shm_transport.h" + +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "common.h" +#include "transfer_engine.h" +#include "transfer_metadata.h" +#include "transport/transport.h" + +namespace mooncake { +const static size_t kDefaultThreadPoolSize = 4; + +ShmTransport::ShmTransport() : thread_pool_(kDefaultThreadPoolSize) { + // TODO +} + +ShmTransport::~ShmTransport() { + // TODO +} + +int ShmTransport::install(std::string &local_server_name, + std::shared_ptr meta, + std::shared_ptr topo) { + // TODO currently support transfer in the same process, + // so segment registration is not needed + return 0; +} + +Status ShmTransport::getTransferStatus(BatchID batch_id, size_t task_id, + TransferStatus &status) { + auto &batch_desc = *((BatchDesc *)(batch_id)); + const size_t task_count = batch_desc.task_list.size(); + if (task_id >= task_count) return Status::InvalidArgument("too many tasks"); + auto &task = batch_desc.task_list[task_id]; + status.transferred_bytes = task.transferred_bytes; + uint64_t success_slice_count = task.success_slice_count; + uint64_t failed_slice_count = task.failed_slice_count; + if (success_slice_count + failed_slice_count == task.slice_count) { + if (failed_slice_count) { + status.s = TransferStatusEnum::FAILED; + } else { + status.s = TransferStatusEnum::COMPLETED; + } + task.is_finished = true; + } else { + status.s = TransferStatusEnum::WAITING; + } + return Status::OK(); +} + +Status ShmTransport::submitTransfer(BatchID batch_id, + const std::vector &entries) { + auto &batch_desc = *((BatchDesc *)(batch_id)); + if (batch_desc.task_list.size() + entries.size() > batch_desc.batch_size) { + LOG(ERROR) << "ShmTransport: Exceed the limitation of current batch's " + "capacity"; + return Status::InvalidArgument("exceed the limitation of current batch's capacity"); + } + + size_t task_id = batch_desc.task_list.size(); + batch_desc.task_list.resize(task_id + entries.size()); + for (auto &request : entries) { + if (request.target_id != LOCAL_SEGMENT_ID) { + LOG(ERROR) << "ShmTransport: Not local segment"; + return Status::InvalidArgument("Not local segment"); + } + TransferTask &task = batch_desc.task_list[task_id]; + ++task_id; + task.total_bytes = request.length; + auto slice = new Slice(); + slice->source_addr = (char *)request.source; + slice->local.dest_addr = (char *)request.target_offset; + slice->length = request.length; + slice->opcode = request.opcode; + slice->task = &task; + slice->target_id = request.target_id; + slice->status = Slice::PENDING; + task.slice_count += 1; + startTransfer(slice); + } + + return Status::OK(); +} + +Status ShmTransport::submitTransferTask( + const std::vector &request_list, + const std::vector &task_list) { + for (size_t index = 0; index < request_list.size(); ++index) { + auto &request = *request_list[index]; + auto &task = *task_list[index]; + if (request.target_id != LOCAL_SEGMENT_ID) { + LOG(ERROR) << "ShmTransport: Not local segment"; + return Status::InvalidArgument("Not local segment"); + } + task.total_bytes = request.length; + auto slice = new Slice(); + slice->source_addr = (char *)request.source; + slice->local.dest_addr = (char *)request.target_offset; + slice->length = request.length; + slice->opcode = request.opcode; + slice->task = &task; + slice->target_id = request.target_id; + slice->status = Slice::PENDING; + task.slice_count += 1; + startTransfer(slice); + } + return Status::OK(); +} + +void ShmTransport::startTransfer(Slice *slice) { + thread_pool_.submit([slice]() { +#ifdef USE_CUDA + if (slice->opcode == TransferRequest::READ) + cudaMemcpy(slice->source_addr, (void *)slice->local.dest_addr, + slice->length, cudaMemcpyDefault); + else + cudaMemcpy((void *)slice->local.dest_addr, slice->source_addr, + slice->length, cudaMemcpyDefault); +#else + if (slice->opcode == TransferRequest::READ) + memcpy(slice->source_addr, (void *)slice->local.dest_addr, + slice->length); + else + memcpy((void *)slice->local.dest_addr, slice->source_addr, + slice->length); +#endif + slice->markSuccess(); + }); +} + +int ShmTransport::registerLocalMemory(void *addr, size_t length, + const std::string &location, + bool remote_accessible, + bool update_metadata) { + return 0; +} + +int ShmTransport::unregisterLocalMemory(void *addr, bool update_metadata) { + return 0; +} + +int ShmTransport::registerLocalMemoryBatch( + const std::vector &buffer_list, + const std::string &location) { + return 0; +} + +int ShmTransport::unregisterLocalMemoryBatch( + const std::vector &addr_list) { + return 0; +} +} // namespace mooncake \ No newline at end of file