Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TransferEngine] Improve local-to-local memcpy performance #134

Closed
wants to merge 10 commits into from
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ With 40 GB of data (equivalent to the size of the KVCache generated by 128k toke
P2P Store is built on the Transfer Engine and supports sharing temporary objects between peer nodes in a cluster. P2P Store is ideal for scenarios like checkpoint transfer, where data needs to be rapidly and efficiently shared across a cluster.
**P2P Store has been used in the checkpoint transfer service of Moonshot AI.**

### Mooncake Store ([Guide](doc/en/mooncake-store-preview.md))
Mooncake Store is a distributed KVCache storage engine specialized for LLM inference. It offers object-level APIs (`Put`, `Get` and `Remove`), and we will soon release an new vLLM integration to demonstrate xPyD disaggregation. Mooncake Store is the central component of the KVCache-centric disaggregated architecture.

#### Highlights
- **Decentralized architecture.** P2P Store leverages a pure client-side architecture with global metadata managed by the etcd service.

Expand Down
3 changes: 2 additions & 1 deletion mooncake-p2p-store/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ fi
# fi

EXT_LDFLAGS="-L$PROJECT_ROOT_DIRECTORY/build/mooncake-transfer-engine/src"
EXT_LDFLAGS+=" -L$PROJECT_ROOT_DIRECTORY/build/mooncake-transfer-engine/src/common/base"
EXT_LDFLAGS+=" -L$PROJECT_ROOT_DIRECTORY/thirdparties/lib"
EXT_LDFLAGS+=" -ltransfer_engine -lstdc++ -lnuma -lglog -libverbs -ljsoncpp -letcd-cpp-api -lprotobuf -lgrpc++ -lgrpc"
EXT_LDFLAGS+=" -ltransfer_engine -lbase -lstdc++ -lnuma -lglog -libverbs -ljsoncpp -letcd-cpp-api -lprotobuf -lgrpc++ -lgrpc -lboost_thread"
# EXT_LDFLAGS+=" -lhiredis" // if USE_REDIS is enabled
# EXT_LDFLAGS+=" -lcurl" // if USE_HTTP is enabled

Expand Down
4 changes: 4 additions & 0 deletions mooncake-store/src/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ ErrorCode Client::InitTransferEngine(const std::string& local_hostname,
}
CHECK(transport) << "Failed to install transport";

LOG(INFO) << "transport_type=shm";
transport = transfer_engine_->installTransport("shm", protocol_args);
CHECK(transport) << "Failed to install transport";

return ErrorCode::OK;
}

Expand Down
12 changes: 6 additions & 6 deletions mooncake-store/tests/distributed_object_store_provider.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
import os
import time
from distributed_object_store import distributed_object_store
from mooncake_vllm_adaptor import MooncakeDistributedStore

def startProvider():
# Initialize the store
store = distributed_object_store()
store = MooncakeDistributedStore()
# Use TCP protocol by default for testing
protocol = os.getenv("PROTOCOL", "tcp")
protocol = os.getenv("PROTOCOL", "rdma")
device_name = os.getenv("DEVICE_NAME", "ibp6s0")
local_hostname = os.getenv("LOCAL_HOSTNAME", "localhost:12355")
metadata_server = os.getenv("METADATA_ADDR", "127.0.0.1:2379")
local_hostname = os.getenv("LOCAL_HOSTNAME", "10.1.100.3:12355")
metadata_server = os.getenv("METADATA_ADDR", "10.1.101.3:2379")
global_segment_size = 3200 * 1024 * 1024
local_buffer_size = 512 * 1024 * 1024
master_server_address = os.getenv("MASTER_SERVER", "127.0.0.1:50051")
master_server_address = os.getenv("MASTER_SERVER", "10.1.101.3:50051")
retcode = store.setup(local_hostname,
metadata_server,
global_segment_size,
Expand Down
23 changes: 16 additions & 7 deletions mooncake-store/tests/stress_cluster_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,34 +34,43 @@ def setup(cls):

def prefill(cls):
index = 0
start = time.time()
value = os.urandom(cls.value_length)
while index < cls.max_requests:
key = "k_" + str(index)
value = bytes([index % 256] * cls.value_length)
# value = bytes([index % 256] * cls.value_length)
retcode = cls.store.put(key, value)
if retcode:
print("WARNING: put failed, key", key)
if random.randint(0, 100) < 98:
index = index + 1
if index % 500 == 0:
print("completed", index, "entries");
time.sleep(20) # wait for decode
print("completed", index, "entries")
end = time.time()
print("elapsed", end - start)
print("bandwidth", cls.value_length * cls.max_requests / (end - start))
time.sleep(5) # wait for decode

def decode(cls):
index = 0
start = time.time()
while index < cls.max_requests:
key = "k_" + str(index)
value = cls.store.get(key)
if len(value) == 0:
print("WARNING: get failed, key", key)
else:
expected_value = bytes([index % 256] * cls.value_length)
if value != expected_value:
print("WARNING: get data corrupted, key", key)
pass
# expected_value = bytes([index % 256] * cls.value_length)
# if value != expected_value:
# print("WARNING: get data corrupted, key", key)
index = index + 1
if index % 500 == 0:
print("completed", index, "entries");
time.sleep(20) # wait for decode
end = time.time()
print("elapsed", end - start)
print("bandwidth", cls.value_length * cls.max_requests / (end - start))
time.sleep(5) # wait for decode


if __name__ == '__main__':
Expand Down
16 changes: 9 additions & 7 deletions mooncake-store/tests/stress_workload_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ class RandomGen {
}
};

const std::string kLocalHostName = "10.1.100.3:12355";
const std::string kMetadataServerAddress = "10.1.101.3:2379";

class ClientIntegrationTest : public ::testing::Test {
protected:
static void SetUpTestSuite() {
Expand All @@ -114,15 +117,15 @@ class ClientIntegrationTest : public ::testing::Test {
const size_t ram_buffer_size = 3200ull * 1024 * 1024;
segment_ptr_ = allocate_buffer_allocator_memory(ram_buffer_size);
ASSERT_TRUE(segment_ptr_);
ErrorCode rc = client_->MountSegment("localhost:12345", segment_ptr_,
ErrorCode rc = client_->MountSegment(kLocalHostName, segment_ptr_,
ram_buffer_size);
if (rc != ErrorCode::OK) {
LOG(ERROR) << "Failed to mount segment: " << toString(rc);
}
}

static void CleanupSegment() {
if (client_->UnmountSegment("localhost:12345", segment_ptr_) !=
if (client_->UnmountSegment(kLocalHostName, segment_ptr_) !=
ErrorCode::OK) {
LOG(ERROR) << "Failed to unmount segment";
}
Expand All @@ -132,11 +135,11 @@ class ClientIntegrationTest : public ::testing::Test {
client_ = std::make_unique<Client>();
void** args =
(FLAGS_protocol == "rdma") ? rdma_args(FLAGS_device_name) : nullptr;
ASSERT_EQ(client_->Init("localhost:12345", // Local hostname
"127.0.0.1:2379", // Metadata connection string
ASSERT_EQ(client_->Init(kLocalHostName, // Local hostname
kMetadataServerAddress, // Metadata connection string
FLAGS_protocol, args, FLAGS_master_address),
ErrorCode::OK);
auto client_buffer_allocator_size = 128 * 1024 * 1024;
auto client_buffer_allocator_size = 256 * 1024 * 1024;
client_buffer_allocator_ =
std::make_unique<SimpleAllocator>(client_buffer_allocator_size);
ErrorCode rc = client_->RegisterLocalMemory(
Expand Down Expand Up @@ -172,7 +175,7 @@ TEST_F(ClientIntegrationTest, StressPutOperations) {
pthread_barrier_init(&barrier, nullptr, kThreads + 1);
std::vector<std::thread> runner_list;
const int rand_len = 100;
const int value_length = kMaxSliceSize;
const int value_length = 2 * 1024 * 1024;

for (int i = 0; i < kThreads; ++i) {
runner_list.push_back(std::thread([&]() {
Expand All @@ -195,7 +198,6 @@ TEST_F(ClientIntegrationTest, StressPutOperations) {
ErrorCode::OK);
ASSERT_EQ(client_->Get(entry.first.data(), slices),
ErrorCode::OK);
ASSERT_EQ(client_->Remove(entry.first.data()), ErrorCode::OK);
}
pthread_barrier_wait(&barrier);
client_buffer_allocator_->deallocate(write_buffer, value_length);
Expand Down
1 change: 1 addition & 0 deletions mooncake-transfer-engine/include/error.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#define ERR_BATCH_BUSY (-4)
#define ERR_DEVICE_NOT_FOUND (-6)
#define ERR_ADDRESS_OVERLAPPED (-7)
#define ERR_NOT_LOCAL_SEGMENT (-8)

#define ERR_DNS (-101)
#define ERR_SOCKET (-102)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright 2025 KVCache.AI
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef SHM_TRANSPORT_H_
#define SHM_TRANSPORT_H_

#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <functional>
#include <iostream>
#include <queue>

#include "transfer_metadata.h"
#include "transport/transport.h"

namespace mooncake {
class TransferMetadata;

class ThreadPool {
public:
ThreadPool(size_t threadCount)
: ioService_(),
work_(boost::asio::make_work_guard(ioService_)),
stopped_(false) {
for (size_t i = 0; i < threadCount; ++i) {
threads_.create_thread(
boost::bind(&boost::asio::io_service::run, &ioService_));
}
}

~ThreadPool() { stop(); }

void submit(std::function<void()> task) {
ioService_.post(std::move(task));
}

void stop() {
if (!stopped_) {
stopped_ = true;
ioService_.stop();
threads_.join_all();
}
}

private:
boost::asio::io_service ioService_;
boost::asio::executor_work_guard<boost::asio::io_service::executor_type>
work_; // 保持 io_service 运行
boost::thread_group threads_;
bool stopped_;
};

class ShmTransport : public Transport {
public:
using BufferDesc = TransferMetadata::BufferDesc;
using SegmentDesc = TransferMetadata::SegmentDesc;

public:
ShmTransport();

~ShmTransport();

Status submitTransfer(BatchID batch_id,
const std::vector<TransferRequest> &entries) override;

Status submitTransferTask(
const std::vector<TransferRequest *> &request_list,
const std::vector<TransferTask *> &task_list) override;

Status getTransferStatus(BatchID batch_id, size_t task_id,
TransferStatus &status) override;

private:
int install(std::string &local_server_name,
std::shared_ptr<TransferMetadata> meta,
std::shared_ptr<Topology> topo);

void startTransfer(Slice *slice);

int registerLocalMemory(void *addr, size_t length,
const std::string &location, bool remote_accessible,
bool update_metadata);

int unregisterLocalMemory(void *addr, bool update_metadata = false);

int registerLocalMemoryBatch(
const std::vector<Transport::BufferEntry> &buffer_list,
const std::string &location);

int unregisterLocalMemoryBatch(
const std::vector<void *> &addr_list) override;

const char *getName() const override { return "shm"; }

private:
std::atomic_bool running_;
ThreadPool thread_pool_;
};
} // namespace mooncake

#endif
8 changes: 4 additions & 4 deletions mooncake-transfer-engine/src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ if (USE_HTTP)
target_link_libraries(transfer_engine PUBLIC ${CURL_LIBRARIES})
endif()
target_link_libraries(
transfer_engine
PUBLIC
base transport rdma_transport ibverbs glog gflags pthread jsoncpp numa
)
transfer_engine
PUBLIC
base transport rdma_transport ibverbs glog gflags pthread jsoncpp numa boost_thread
)

if (USE_CUDA)
target_include_directories(transfer_engine PRIVATE /usr/local/cuda/include)
Expand Down
7 changes: 5 additions & 2 deletions mooncake-transfer-engine/src/multi_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include "transport/rdma_transport/rdma_transport.h"
#include "transport/tcp_transport/tcp_transport.h"
#include "transport/shm_transport/shm_transport.h"
#include "transport/transport.h"
#ifdef USE_NVMEOF
#include "transport/nvmeof_transport/nvmeof_transport.h"
Expand Down Expand Up @@ -137,6 +138,8 @@ Transport *MultiTransport::installTransport(const std::string &proto,
transport = new RdmaTransport();
} else if (std::string(proto) == "tcp") {
transport = new TcpTransport();
} else if (std::string(proto) == "shm") {
transport = new ShmTransport();
}
#ifdef USE_NVMEOF
else if (std::string(proto) == "nvmeof") {
Expand All @@ -159,8 +162,8 @@ Transport *MultiTransport::installTransport(const std::string &proto,
}

Transport *MultiTransport::selectTransport(const TransferRequest &entry) {
if (entry.target_id == LOCAL_SEGMENT_ID && transport_map_.count("local"))
return transport_map_["local"].get();
if (entry.target_id == LOCAL_SEGMENT_ID && transport_map_.count("shm"))
return transport_map_["shm"].get();
auto target_segment_desc = metadata_->getSegmentDescByID(entry.target_id);
if (!target_segment_desc) {
LOG(ERROR) << "MultiTransport: Incorrect target segment id "
Expand Down
3 changes: 3 additions & 0 deletions mooncake-transfer-engine/src/transport/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ add_library(transport OBJECT ${XPORT_SOURCES} $<TARGET_OBJECTS:rdma_transport>)
add_subdirectory(tcp_transport)
target_sources(transport PUBLIC $<TARGET_OBJECTS:tcp_transport>)

add_subdirectory(shm_transport)
target_sources(transport PUBLIC $<TARGET_OBJECTS:shm_transport>)

if (USE_NVMEOF)
add_subdirectory(nvmeof_transport)
target_sources(transport PUBLIC $<TARGET_OBJECTS:nvmeof_transport>)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
file(GLOB SHM_SOURCES "*.cpp")

add_library(shm_transport OBJECT ${SHM_SOURCES})
Loading