Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ docker run --rm --user "$(id -u):$(id -g)" \
-e USE_TCPX="${USE_TCPX:-0}" \
-e USE_EFA="${USE_EFA:-0}" \
-e USE_IB="${USE_IB:-0}" \
-e USE_TCP="${USE_TCP:-0}" \
-e MAKE_NORMAL_MODE="${MAKE_NORMAL_MODE:-}" \
-e TORCH_CUDA_ARCH_LIST="${TORCH_CUDA_ARCH_LIST:-}" \
-e FUNCTION_DEF="$(declare -f build_rccl_nccl_h build_ccl_rdma build_ccl_efa build_p2p build_ep build_eccl)" \
Expand Down
23 changes: 21 additions & 2 deletions p2p/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ USE_EFA ?= $(shell echo $${USE_EFA:-0})
# IB optional integration
USE_IB ?= $(shell echo $${USE_IB:-0})

# TCP endpoint integration
USE_TCP ?= $(shell echo $${USE_TCP:-0})

# Compiler and flags
CUDA_HOME ?= /usr/local/cuda
CUDA_INC := $(CUDA_HOME)/include
Expand Down Expand Up @@ -38,6 +41,11 @@ else
LIBS2 += -lglog -lgflags -lgtest -lz -lelf -lpthread -libverbs
endif

# Add TCP endpoint flag
ifeq ($(USE_TCP),1)
CXXFLAGS += -DUCCL_P2P_USE_TCP
endif

# Python and pybind11 configuration
PYTHON ?= python3
PYTHON_CONFIG = $(PYTHON)-config
Expand Down Expand Up @@ -73,8 +81,13 @@ CXXFLAGS += -DUCCL_P2P_USE_TCPX -I$(NCCL_INC)
LDFLAGS += -L$(NCCL_LIB) -lnccl -Wl,-rpath,$(NCCL_LIB)
OBJECTS := $(CORE_OBJECTS)
else
SOURCES := engine.cc engine_pybind.cc
CORE_OBJECT := engine.o
SOURCES := engine.cc engine_pybind.cc tcp/tcp_endpoint.cc tcp/tcp_worker_pool.cc
CORE_OBJECT := engine.o tcp/tcp_endpoint.o tcp/tcp_worker_pool.o
endif

ifeq ($(USE_TCPX),1)
OBJECTS := $(CORE_OBJECTS)
else
OBJECTS := $(SOURCES:.cc=.o)
endif

Expand Down Expand Up @@ -121,6 +134,11 @@ endif
%.o: %.cc
$(CXX) $(CXXFLAGS) $(PYBIND11_INCLUDES) -c $< -o $@

# Compile TCP source files
tcp/%.o: tcp/%.cc
@mkdir -p tcp
$(CXX) $(CXXFLAGS) $(PYBIND11_INCLUDES) -I. -c $< -o $@

ifeq ($(USE_TCPX),1)
install: $(P2P_SHARED_LIB)
install -m 755 $(P2P_SHARED_LIB) $(LIBDIR)/
Expand All @@ -138,6 +156,7 @@ endif
# Clean build artifacts
clean:
rm -f $(OBJECTS) $(CAPI_OBJECT) $(P2P_SHARED_LIB)
rm -f tcp/*.o
ifneq ($(USE_TCPX),1)
rm -f $(P2P_PYTHON_EXT) $(RDMA_PLUGIN_LIB) $(RDMA_OBJECTS)
endif
Expand Down
18 changes: 16 additions & 2 deletions p2p/Makefile.rocm
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ USE_EFA ?= $(shell echo $${USE_EFA:-0})
# IB optional integration
USE_IB ?= $(shell echo $${USE_IB:-0})

# TCP endpoint integration
USE_TCP ?= $(shell echo $${USE_TCP:-0})

# Compiler and flags
HIP_HOME?=/opt/rocm
HIP_INC := $(HIP_HOME)/include
Expand Down Expand Up @@ -39,6 +42,11 @@ LIBDIR ?= $(PREFIX)/lib
INCDIR ?= $(PREFIX)/include

CXXFLAGS += -D__HIP_PLATFORM_AMD__

# Add TCP endpoint flag
ifeq ($(USE_TCP),1)
CXXFLAGS += -DUCCL_P2P_USE_TCP
endif
LDFLAGS = -L$(HIP_LIB) -lamdhip64 \
-Wl,-rpath,$(HIP_LIB) -L${CONDA_LIB_HOME} -lglog -lgflags -lgtest \
-lz -lelf -libverbs -lpthread
Expand All @@ -47,8 +55,8 @@ LDFLAGS = -L$(HIP_LIB) -lamdhip64 \
P2P_PYTHON_EXT := p2p$(PYEXT)
P2P_SHARED_LIB := libuccl_p2p.so
RDMA_PLUGIN_LIB := librdma_plugin.a
SOURCES := engine.cc engine_pybind.cc
OBJECTS := $(SOURCES:.cc=.o)
SOURCES := engine.cc engine_pybind.cc tcp/tcp_endpoint.cc tcp/tcp_worker_pool.cc
OBJECTS := engine.o engine_pybind.o tcp/tcp_endpoint.o tcp/tcp_worker_pool.o
CAPI_SOURCE := uccl_engine.cc
CAPI_HEADER := uccl_engine.h
CAPI_OBJECT := $(CAPI_SOURCE:.cc=.o)
Expand Down Expand Up @@ -84,6 +92,11 @@ $(RDMA_OBJECTS): %.o: $(RDMA_HOME)/%.cc
%.o: %.cc
$(CXX) $(CXXFLAGS) $(PYBIND11_INCLUDES) -c $< -o $@

# Compile TCP source files
tcp/%.o: tcp/%.cc
@mkdir -p tcp
$(CXX) $(CXXFLAGS) $(PYBIND11_INCLUDES) -I. -c $< -o $@

# Install the module
install: $(P2P_PYTHON_EXT) $(P2P_SHARED_LIB)
@mkdir -p $(INSTALL_DIR)
Expand All @@ -95,6 +108,7 @@ install: $(P2P_PYTHON_EXT) $(P2P_SHARED_LIB)
# Clean build artifacts
clean:
rm -f $(OBJECTS) $(CAPI_OBJECT) $(P2P_SHARED_LIB) $(P2P_PYTHON_EXT) $(RDMA_PLUGIN_LIB) $(RDMA_OBJECTS)
rm -f tcp/*.o
make -C $(RDMA_HOME) -f Makefile.rocm clean -j$(nproc)

# Test the module
Expand Down
116 changes: 114 additions & 2 deletions p2p/endpoint_wrapper.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
#pragma once
#include "engine.h"

#ifdef UCCL_P2P_USE_TCP
#include "tcp/tcp_endpoint.h"
#endif

namespace unified {

template <class T>
Expand All @@ -19,6 +23,12 @@ inline void delete_ep(RDMAEndPoint const& s) {
else if constexpr (std::is_same_v<T, std::shared_ptr<NICEndpoint>>) {
// shared_ptr: do nothing (shared_ptr handles lifetime)
}
#endif
#ifdef UCCL_P2P_USE_TCP
else if constexpr (std::is_same_v<T,
std::shared_ptr<tcp::TCPEndpoint>>) {
// shared_ptr: do nothing (shared_ptr handles lifetime)
}
#endif
else {
static_assert(always_false<T>::value,
Expand Down Expand Up @@ -53,15 +63,50 @@ inline int set_request(std::shared_ptr<NICEndpoint> const& obj, Conn* conn,
}
#endif

#ifdef UCCL_P2P_USE_TCP
inline int tcp_set_request_write(std::shared_ptr<tcp::TCPEndpoint> const& obj,
Conn* conn, unified::P2PMhandle* local_mh,
void* src, size_t size,
FifoItem const& slot_item,
uccl::ucclRequest* ureq) {
ureq->type = uccl::ReqType::ReqWrite;
ureq->n = conn->uccl_conn_id_.flow_id;
return obj->uccl_write_async(
reinterpret_cast<uccl::UcclFlow*>(conn->uccl_conn_id_.context), nullptr,
src, size, slot_item, ureq);
}

inline int tcp_set_request_read(std::shared_ptr<tcp::TCPEndpoint> const& obj,
Conn* conn, unified::P2PMhandle* local_mh,
void* dst, size_t size,
FifoItem const& slot_item,
uccl::ucclRequest* ureq) {
ureq->type = uccl::ReqType::ReqRead;
ureq->n = conn->uccl_conn_id_.flow_id;
return obj->uccl_read_async(
reinterpret_cast<uccl::UcclFlow*>(conn->uccl_conn_id_.context), nullptr,
dst, size, slot_item, ureq);
}
#endif

inline uccl::ConnID uccl_connect(RDMAEndPoint const& s, int dev,
int local_gpuidx, int remote_dev,
int remote_gpuidx, std::string remote_ip,
uint16_t remote_port) {
return std::visit(
[dev, local_gpuidx, remote_dev, remote_gpuidx, remote_ip,
remote_port](auto&& obj) -> uccl::ConnID {
return obj->uccl_connect(dev, local_gpuidx, remote_dev, remote_gpuidx,
remote_ip, remote_port);
using T = std::decay_t<decltype(obj)>;
#ifdef UCCL_P2P_USE_TCP
if constexpr (std::is_same_v<T, std::shared_ptr<tcp::TCPEndpoint>>) {
return obj->uccl_connect(dev, local_gpuidx, remote_dev, remote_gpuidx,
remote_ip, remote_port);
} else
#endif
{
return obj->uccl_connect(dev, local_gpuidx, remote_dev, remote_gpuidx,
remote_ip, remote_port);
}
},
s);
}
Expand Down Expand Up @@ -115,6 +160,13 @@ inline bool uccl_regmr(RDMAEndPoint const& s, int dev, void* data, size_t len,
return false;
}
}
#endif
#ifdef UCCL_P2P_USE_TCP
else if constexpr (std::is_same_v<T,
std::shared_ptr<tcp::TCPEndpoint>>) {
// TCP doesn't need memory registration
mhandle->mhandle_ = nullptr;
}
#endif
else {
static_assert(always_false<T>::value,
Expand Down Expand Up @@ -157,6 +209,16 @@ inline int uccl_send_async(RDMAEndPoint const& s, Conn* conn,
ureq->n = conn->uccl_conn_id_.flow_id;
return ureq->engine_idx;
}
#endif
#ifdef UCCL_P2P_USE_TCP
else if constexpr (std::is_same_v<T,
std::shared_ptr<tcp::TCPEndpoint>>) {
ureq->type = uccl::ReqType::ReqTx;
ureq->n = conn->uccl_conn_id_.flow_id;
return obj->uccl_send_async(
reinterpret_cast<uccl::UcclFlow*>(conn->uccl_conn_id_.context),
nullptr, data, size, ureq);
}
#endif
else {
static_assert(always_false<T>::value,
Expand Down Expand Up @@ -190,6 +252,16 @@ inline int uccl_recv_async(RDMAEndPoint const& s, Conn* conn,
ureq->n = conn->uccl_conn_id_.flow_id;
return ureq->engine_idx;
}
#endif
#ifdef UCCL_P2P_USE_TCP
else if constexpr (std::is_same_v<T,
std::shared_ptr<tcp::TCPEndpoint>>) {
ureq->type = uccl::ReqType::ReqRx;
ureq->n = conn->uccl_conn_id_.flow_id;
return obj->uccl_recv_async(
reinterpret_cast<uccl::UcclFlow*>(conn->uccl_conn_id_.context),
nullptr, data, size, n, ureq);
}
#endif
else {
static_assert(always_false<T>::value,
Expand Down Expand Up @@ -220,6 +292,13 @@ inline bool uccl_poll_ureq_once(RDMAEndPoint const& s,
return obj->checkRecvComplete_once(ureq->n, ureq->engine_idx);
}
}
#endif
#ifdef UCCL_P2P_USE_TCP
else if constexpr (std::is_same_v<T,
std::shared_ptr<tcp::TCPEndpoint>>) {
// TCP operations are blocking, so always complete immediately
return obj->uccl_poll_ureq_once(ureq);
}
#endif
else {
static_assert(always_false<T>::value,
Expand Down Expand Up @@ -248,6 +327,13 @@ inline int uccl_read_async(RDMAEndPoint const& s, Conn* conn,
ureq->type = uccl::ReqType::ReqRead;
return set_request(obj, conn, local_mh, dst, size, slot_item, ureq);
}
#endif
#ifdef UCCL_P2P_USE_TCP
else if constexpr (std::is_same_v<T,
std::shared_ptr<tcp::TCPEndpoint>>) {
return tcp_set_request_read(obj, conn, local_mh, dst, size, slot_item,
ureq);
}
#endif
else {
static_assert(always_false<T>::value,
Expand Down Expand Up @@ -276,6 +362,13 @@ inline int uccl_write_async(RDMAEndPoint const& s, Conn* conn,
ureq->type = uccl::ReqType::ReqWrite;
return set_request(obj, conn, local_mh, src, size, slot_item, ureq);
}
#endif
#ifdef UCCL_P2P_USE_TCP
else if constexpr (std::is_same_v<T,
std::shared_ptr<tcp::TCPEndpoint>>) {
return tcp_set_request_write(obj, conn, local_mh, src, size,
slot_item, ureq);
}
#endif
else {
static_assert(always_false<T>::value,
Expand Down Expand Up @@ -312,6 +405,19 @@ inline int prepare_fifo_metadata(RDMAEndPoint const& s, Conn* conn,
uccl::serialize_fifo_item(remote_mem_info, out_buf);
return 0;
}
#endif
#ifdef UCCL_P2P_USE_TCP
else if constexpr (std::is_same_v<T,
std::shared_ptr<tcp::TCPEndpoint>>) {
// For TCP, just store address and size (no rkeys needed)
FifoItem remote_mem_info;
remote_mem_info.addr = reinterpret_cast<uint64_t>(data);
remote_mem_info.size = size;
std::memset(remote_mem_info.padding, 0,
sizeof(remote_mem_info.padding));
uccl::serialize_fifo_item(remote_mem_info, out_buf);
return 0;
}
#endif
else {
static_assert(always_false<T>::value,
Expand All @@ -334,6 +440,12 @@ inline void uccl_deregmr(RDMAEndPoint const& s, P2PMhandle* mhandle) {
else if constexpr (std::is_same_v<T, std::shared_ptr<NICEndpoint>>) {
obj->uccl_deregmr(mhandle->mr_array);
}
#endif
#ifdef UCCL_P2P_USE_TCP
else if constexpr (std::is_same_v<T,
std::shared_ptr<tcp::TCPEndpoint>>) {
// TCP doesn't need memory deregistration - no-op
}
#endif
else {
static_assert(always_false<T>::value,
Expand Down
23 changes: 21 additions & 2 deletions p2p/engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,13 @@ Endpoint::Endpoint(uint32_t const local_gpu_idx, uint32_t const num_cpus)
ep_ = std::shared_ptr<NICEndpoint>(
new NICEndpoint(local_gpu_idx_, INVALID_RANK_ID, 0, false));
numa_node_ = 0;
#elif defined(UCCL_P2P_USE_TCP)
ep_ = std::make_shared<tcp::TCPEndpoint>(local_gpu_idx_, 0);
numa_node_ = 0;
// Initialize GPU to device mapping (use 0 for TCP since no RDMA devices)
for (int i = 0; i < kMaxNumGPUs; i++) {
gpu_to_dev[i] = 0;
}
#else
ep_ = new uccl::RDMAEndpoint(num_cpus_);

Expand Down Expand Up @@ -130,6 +137,15 @@ Endpoint::Endpoint(uint32_t const num_cpus) : num_cpus_(num_cpus) {
#ifdef UCCL_P2P_USE_NATIVE_RDMA
ep_ = std::shared_ptr<NICEndpoint>(
new NICEndpoint(local_gpu_idx_, INVALID_RANK_ID, 0, false));
#elif defined(UCCL_P2P_USE_TCP)
// Initialize the TCP endpoint
ep_ = std::make_shared<tcp::TCPEndpoint>(local_gpu_idx_, 0);
// Initialize GPU to device mapping (use 0 for TCP since no RDMA devices)
int ngpus_detected = 0;
GPU_RT_CHECK(gpuGetDeviceCount(&ngpus_detected));
for (int i = 0; i < kMaxNumGPUs; i++) {
gpu_to_dev[i] = 0;
}
#else
// Initialize the RDMA endpoint with lazy creation.
ep_ = new uccl::RDMAEndpoint(num_cpus_);
Expand Down Expand Up @@ -198,14 +214,18 @@ void Endpoint::initialize_engine() {
GPU_RT_CHECK(gpuStreamCreateWithFlags(&streams_[i], gpuStreamNonBlocking));
}

#if defined(UCCL_P2P_USE_TCP)
numa_node_ = 0; // TCP doesn't have RDMA devices
#else
numa_node_ =
uccl::RDMAFactory::get_factory_dev(gpu_to_dev[local_gpu_idx_])->numa_node;
#endif

// Initialize the engine based on the GPU index.
std::cout << "Lazy creation of engine, GPU index: " << local_gpu_idx_
<< std::endl;
// Initialize engine by fixed engine offset since we did lazy initialization
#ifndef UCCL_P2P_USE_NATIVE_RDMA
#if !defined(UCCL_P2P_USE_NATIVE_RDMA) && !defined(UCCL_P2P_USE_TCP)
unified::initialize_engine_by_dev(ep_, gpu_to_dev[local_gpu_idx_], false);
std::cout << "Engine initialized for GPU " << local_gpu_idx_ << std::endl;
#endif
Expand Down Expand Up @@ -581,7 +601,6 @@ bool Endpoint::recv(uint64_t conn_id, uint64_t mr_id, void* data, size_t size) {
}
if (unified::uccl_poll_ureq_once(ep_, &ureq[i % kMaxInflightChunks])) {
// Just mark it as completed, DO NOT increment ureq_finished here.
LOG(INFO) << "chunk recv::::" << i;
done[i % kMaxInflightChunks] = true;
}
}
Expand Down
Loading