Skip to content

Commit 20a94fc

Browse files
yuan-luoluoyuan.luo
and
luoyuan.luo
authored
[TransferEngine] Support Status return value (#125)
Co-authored-by: luoyuan.luo <[email protected]>
1 parent d6cd73b commit 20a94fc

29 files changed

+668
-195
lines changed

CMakeLists.txt

+1-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ execute_process(
4747
string(STRIP ${PYTHON_SYS_PATH} PYTHON_SYS_PATH)
4848

4949
set(PYBIND11_FINDPYTHON ON)
50-
find_package(pybind11 CONFIG REQUIRED)
50+
find_package(pybind11 CONFIG REQUIRED)
5151

5252
if (USE_CUDA)
5353
add_compile_definitions(USE_CUDA)

mooncake-integration/vllm/vllm_adaptor.cpp

+4-4
Original file line numberDiff line numberDiff line change
@@ -201,13 +201,13 @@ int VLLMAdaptor::transferSync(const char *target_hostname, uintptr_t buffer,
201201
entry.target_id = handle;
202202
entry.target_offset = peer_buffer_address;
203203

204-
int ret = engine_->submitTransfer(batch_id, {entry});
205-
if (ret < 0) return -1;
204+
Status s = engine_->submitTransfer(batch_id, {entry});
205+
if (!s.ok()) return -1;
206206

207207
TransferStatus status;
208208
while (true) {
209-
int ret = engine_->getTransferStatus(batch_id, 0, status);
210-
LOG_ASSERT(!ret);
209+
Status s = engine_->getTransferStatus(batch_id, 0, status);
210+
LOG_ASSERT(s.ok());
211211
if (status.s == TransferStatusEnum::COMPLETED) {
212212
engine_->freeBatchID(batch_id);
213213
return 0;

mooncake-integration/vllm/vllm_adaptor.h

+1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
#include <stack>
2525
#include <vector>
2626

27+
#include "common/base/status.h"
2728
#include "transfer_engine.h"
2829
#include "transport/rdma_transport/rdma_transport.h"
2930
#include "transport/transport.h"

mooncake-p2p-store/src/p2pstore/transfer_engine.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ func (engine *TransferEngine) submitTransfer(batchID BatchID, requests []Transfe
140140
}
141141

142142
ret := C.submitTransfer(engine.engine, C.batch_id_t(batchID), &requestSlice[0], C.size_t(len(requests)))
143-
if ret < 0 {
143+
if ret != 0 {
144144
return ErrTransferEngine
145145
}
146146
return nil
@@ -149,15 +149,15 @@ func (engine *TransferEngine) submitTransfer(batchID BatchID, requests []Transfe
149149
func (engine *TransferEngine) getTransferStatus(batchID BatchID, taskID int) (int, uint64, error) {
150150
var status C.transfer_status_t
151151
ret := C.getTransferStatus(engine.engine, C.batch_id_t(batchID), C.size_t(taskID), &status)
152-
if ret < 0 {
152+
if ret != 0 {
153153
return -1, 0, ErrTransferEngine
154154
}
155155
return int(status.status), uint64(status.transferred_bytes), nil
156156
}
157157

158158
func (engine *TransferEngine) freeBatchID(batchID BatchID) error {
159159
ret := C.freeBatchID(engine.engine, C.batch_id_t(batchID))
160-
if ret < 0 {
160+
if ret != 0 {
161161
return ErrTransferEngine
162162
}
163163
return nil

mooncake-store/src/client.cpp

+6-7
Original file line numberDiff line numberDiff line change
@@ -391,10 +391,10 @@ ErrorCode Client::TransferData(
391391
return ErrorCode::TRANSFER_FAIL;
392392
}
393393

394-
int error_code = transfer_engine_->submitTransfer(batch_id, transfer_tasks);
395-
if (error_code != 0) {
394+
Status s = transfer_engine_->submitTransfer(batch_id, transfer_tasks);
395+
if (!s.ok()) {
396396
LOG(ERROR) << "Failed to submit all transfers, error code is "
397-
<< error_code;
397+
<< s.code();
398398
transfer_engine_->freeBatchID(batch_id);
399399
return ErrorCode::TRANSFER_FAIL;
400400
}
@@ -415,11 +415,10 @@ ErrorCode Client::TransferData(
415415
}
416416
for (size_t i = 0; i < batch_size; ++i) {
417417
TransferStatus status;
418-
error_code =
419-
transfer_engine_->getTransferStatus(batch_id, i, status);
420-
if (error_code != 0) {
418+
s = transfer_engine_->getTransferStatus(batch_id, i, status);
419+
if (!s.ok()) {
421420
LOG(ERROR) << "Transfer " << i
422-
<< " error, error_code=" << error_code;
421+
<< " error, error_code=" << s.code();
423422
transfer_engine_->freeBatchID(batch_id);
424423
return ErrorCode::TRANSFER_FAIL;
425424
}

mooncake-transfer-engine/example/transfer_engine_bench.cpp

+10-9
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
#include <sstream>
2525
#include <unordered_map>
2626

27+
#include "common/base/status.h"
2728
#include "transfer_engine.h"
2829
#include "transport/transport.h"
2930

@@ -160,7 +161,7 @@ static inline std::string calculateRate(uint64_t data_bytes, double duration) {
160161
volatile bool running = true;
161162
std::atomic<size_t> total_batch_count(0);
162163

163-
int initiatorWorker(TransferEngine *engine, SegmentID segment_id, int thread_id,
164+
Status initiatorWorker(TransferEngine *engine, SegmentID segment_id, int thread_id,
164165
void *addr) {
165166
bindToSocket(thread_id % NR_SOCKETS);
166167
TransferRequest::OpCode opcode;
@@ -184,7 +185,7 @@ int initiatorWorker(TransferEngine *engine, SegmentID segment_id, int thread_id,
184185
size_t batch_count = 0;
185186
while (running) {
186187
auto batch_id = engine->allocateBatchID(FLAGS_batch_size);
187-
int ret = 0;
188+
Status s;
188189
std::vector<TransferRequest> requests;
189190
for (int i = 0; i < FLAGS_batch_size; ++i) {
190191
TransferRequest entry;
@@ -199,14 +200,14 @@ int initiatorWorker(TransferEngine *engine, SegmentID segment_id, int thread_id,
199200
requests.emplace_back(entry);
200201
}
201202

202-
ret = engine->submitTransfer(batch_id, requests);
203-
LOG_ASSERT(!ret);
203+
s = engine->submitTransfer(batch_id, requests);
204+
LOG_ASSERT(s.ok());
204205
for (int task_id = 0; task_id < FLAGS_batch_size; ++task_id) {
205206
bool completed = false;
206207
TransferStatus status;
207208
while (!completed) {
208-
int ret = engine->getTransferStatus(batch_id, task_id, status);
209-
LOG_ASSERT(!ret);
209+
Status s = engine->getTransferStatus(batch_id, task_id, status);
210+
LOG_ASSERT(s.ok());
210211
if (status.s == TransferStatusEnum::COMPLETED)
211212
completed = true;
212213
else if (status.s == TransferStatusEnum::FAILED) {
@@ -217,13 +218,13 @@ int initiatorWorker(TransferEngine *engine, SegmentID segment_id, int thread_id,
217218
}
218219
}
219220

220-
ret = engine->freeBatchID(batch_id);
221-
LOG_ASSERT(!ret);
221+
s = engine->freeBatchID(batch_id);
222+
LOG_ASSERT(s.ok());
222223
batch_count++;
223224
}
224225
LOG(INFO) << "Worker " << thread_id << " stopped!";
225226
total_batch_count.fetch_add(batch_count);
226-
return 0;
227+
return Status::OK();
227228
}
228229

229230
std::string formatDeviceNames(const std::string &device_names) {

0 commit comments

Comments
 (0)