Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker/Dockerfile.rocm
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ RUN apt-get update && \
RUN python${PY_VER} -m pip install --no-cache-dir build auditwheel pybind11

RUN python${PY_VER} -m pip install --no-cache-dir --pre torch torchvision \
--index-url https://download.pytorch.org/whl/nightly/rocm7.0
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does rocm7.0 vs. rocm7.1 matter?

--index-url https://download.pytorch.org/whl/nightly/rocm7.1

RUN python${PY_VER} -m pip install --no-cache-dir --upgrade setuptools

Expand Down
12 changes: 12 additions & 0 deletions ep/bench/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,18 @@ def initialize_uccl(

ep.register_proxies(local_rank, proxies)

# Set atomic buffer pointer for all proxies BEFORE starting them
# This ensures the atomic buffer info is included in connection info exchange
# Note: Only thread 0's proxy allocates the atomic buffer in its constructor
if not is_intranode and len(proxies) > 0:
# Get atomic buffer pointer from thread 0 proxy (only thread 0 allocates it)
# This must be done before start_dual() so the atomic buffer info is included
# in the connection info exchange during init_common()
atomic_buffer_ptr = proxies[0].get_atomic_buffer_ptr()
if atomic_buffer_ptr:
for proxy in proxies:
proxy.set_atomic_buffer_ptr(atomic_buffer_ptr)

dist.barrier(group)
if not is_intranode:
for proxy in proxies:
Expand Down
1 change: 1 addition & 0 deletions ep/include/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <stdio.h>
#include <unistd.h>

// #define SOFTWARE_ORDERING
#define MAX_IB_DEVS 32
// #define MEASURE_PER_OP_LATENCY
// #define MEASURE_PER_VERB_LATENCY
Expand Down
6 changes: 6 additions & 0 deletions ep/include/proxy_ctx.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ struct ProxyCtx {
uint32_t remote_rkey = 0;
uint32_t rkey = 0;

// Atomic buffer (separate from main RDMA buffer)
ibv_mr* atomic_buffer_mr = nullptr; // MR for local atomic_buffer_ptr
uintptr_t remote_atomic_buffer_addr = 0; // Remote atomic_buffer_ptr address
uint64_t remote_atomic_buffer_len = 0; // Remote atomic_buffer_ptr length
uint32_t remote_atomic_buffer_rkey = 0; // Remote atomic_buffer_ptr rkey

// Buffer offset within rdma_buffer for address translation
uintptr_t dispatch_recv_data_offset =
0; // offset of dispatch_rdma_recv_data_buffer from rdma_buffer base
Expand Down
13 changes: 10 additions & 3 deletions ep/include/rdma.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ struct RDMAConnectionInfo {
uint16_t lid; // Local ID
uint8_t gid[16]; // Global ID for RoCE (optional)

// Atomic buffer info (separate from main GPU buffer)
uint32_t atomic_buffer_rkey = 0; // Atomic buffer memory region key
uintptr_t atomic_buffer_addr = 0; // Atomic buffer address
uint64_t atomic_buffer_len = 0; // Atomic buffer length

// #ifdef EFA
uint32_t num_rings;
uint32_t data_qp_num[kChannelPerProxy];
Expand Down Expand Up @@ -283,13 +288,14 @@ struct BarrierImm {
// [28:8]=SEQ (21 bits), [7:0]=SRC_RANK
static constexpr uint32_t kCtrlBit = 1u << 30;
static constexpr uint32_t kAckBit = 1u << 29;
static constexpr uint32_t kSeqMask = 0x1FFFFFu;
static inline bool IsAck(uint32_t imm) { return (imm & kAckBit) != 0u; }
static inline uint32_t Pack(bool ack, uint32_t seq, uint8_t src_rank) {
return kCtrlBit | (ack ? kAckBit : 0u) |
((seq & 0x1FFFFFu) << 8) // 21 bits for seq
((seq & kSeqMask) << 8) // 21 bits for seq
| uint32_t(src_rank);
}
static inline uint32_t Seq(uint32_t imm) { return (imm >> 8) & 0x1FFFFFu; }
static inline uint32_t Seq(uint32_t imm) { return (imm >> 8) & kSeqMask; }
static inline uint8_t Rank(uint32_t imm) { return imm & 0xFFu; }
explicit BarrierImm(uint32_t imm = 0) : value(imm) {}
bool GetIsAck() const { return IsAck(value); }
Expand Down Expand Up @@ -329,7 +335,8 @@ void remote_process_completions(
int my_rank, int num_nodes, bool use_normal_mode = false);
void create_per_thread_qp(ProxyCtx& S, void* gpu_buffer, size_t size,
RDMAConnectionInfo* local_info, int rank,
size_t num_rings, bool use_normal_mode);
size_t num_rings, bool use_normal_mode,
void* atomic_buffer_ptr = nullptr);
ibv_cq* create_per_thread_cq(ProxyCtx& S);
void remote_poll_completions(ProxyCtx& S, int idx, CopyRingBuffer& g_ring,
std::vector<ProxyCtx*>& ctx_by_tag,
Expand Down
46 changes: 32 additions & 14 deletions ep/src/internode.cu
Original file line number Diff line number Diff line change
Expand Up @@ -720,7 +720,7 @@ __global__ void __launch_bounds__(
// Read RDMA rank existence
uint64_t is_token_in_rank_uint64 = 0;
if (lane_id < kNumRDMARanks) {
is_token_in_rank_uint64 = __ldg(reinterpret_cast<uint64_t const*>(
is_token_in_rank_uint64 = *(reinterpret_cast<uint64_t const*>(
is_token_in_rank + token_idx * num_ranks +
lane_id * NUM_MAX_NVL_PEERS));
}
Expand Down Expand Up @@ -1025,10 +1025,20 @@ __global__ void __launch_bounds__(
num_bytes_per_msg,
translate_dst_rdma_rank<kLowLatencyMode>(dst_rdma_rank, nvl_rank),
channel_id, // NOTE(MaoZiming): use channel_id for rb.
lane_id, 0, d2h_channel_addrs, num_d2h_channel_addrs, false, -1,
lane_id, 0, d2h_channel_addrs, num_d2h_channel_addrs, false,
// NOTE(MaoZiming): for AMD GPUs, we directly send a subsequent RDMA
// to update the tail. For other GPUs and EFA NICs, we use the
// CPU-emulated atomics, allow us to piggyback the atomic operation
// with the RDMA send.
#if defined(__HIP_PLATFORM_AMD__) || defined(__HIPCC__)
-1,
#else
-1,
reinterpret_cast<uint64_t>(rdma_channel_tail.buffer(rdma_rank)) -
reinterpret_cast<uint64_t>(original_atomic_buffer_ptr),
num_tokens_to_issue);
num_tokens_to_issue
#endif
);
} else {
// Lighter fence for local RDMA rank
memory_fence();
Expand All @@ -1046,7 +1056,13 @@ __global__ void __launch_bounds__(
translate_dst_rdma_rank<kLowLatencyMode>(dst_rdma_rank, nvl_rank),
channel_id, // NOTE(MaoZiming): use channel_id for rb.
dst_rdma_rank == rdma_rank, d2h_channel_addrs,
num_d2h_channel_addrs, false, -1, true);
num_d2h_channel_addrs, false, -1,
#if defined(__HIP_PLATFORM_AMD__) || defined(__HIPCC__)
false
#else
true
#endif
);
}
__syncwarp();
}
Expand Down Expand Up @@ -1178,9 +1194,6 @@ __global__ void __launch_bounds__(
trap();
}
}
#if defined(__HIP_PLATFORM_AMD__) || defined(__HIPCC__)
memory_fence();
#endif
auto src_rdma_head =
__shfl_sync(WARP_MASK, cached_rdma_channel_head, src_rdma_rank);
auto src_rdma_tail =
Expand Down Expand Up @@ -1249,13 +1262,8 @@ __global__ void __launch_bounds__(
// Move tail index
__syncwarp();
if (lane_id == 0)
#if defined(__HIP_PLATFORM_AMD__) || defined(__HIPCC__)
__atomic_store_n(nvl_channel_tail.buffer(), cached_nvl_channel_tail,
__ATOMIC_RELEASE);
#else
st_release_sys_global(nvl_channel_tail.buffer(),
cached_nvl_channel_tail);
#endif
}
// Retired
__syncwarp();
Expand Down Expand Up @@ -2611,10 +2619,14 @@ __global__ void __launch_bounds__((kNumForwarders + 1) * WARP_SIZE, 1)
nvl_rank),
channel_id, // NOTE(MaoZiming): use channel_id for rb.
lane_id, 0, d2h_channel_addrs, num_d2h_channel_addrs, false, -1,
#if defined(__HIP_PLATFORM_AMD__) || defined(__HIPCC__)
#else
reinterpret_cast<uint64_t>(
rdma_channel_tail.buffer(rdma_rank)) -
reinterpret_cast<uint64_t>(original_atomic_buffer_ptr),
num_chunked_tokens);
num_chunked_tokens
#endif
);
} else {
memory_fence();
}
Expand All @@ -2630,7 +2642,13 @@ __global__ void __launch_bounds__((kNumForwarders + 1) * WARP_SIZE, 1)
nvl_rank),
channel_id, // NOTE(MaoZiming): use warp_id for rb.
dst_rdma_rank == rdma_rank, d2h_channel_addrs,
num_d2h_channel_addrs, false, -1, true);
num_d2h_channel_addrs, false, -1,
#if defined(__HIP_PLATFORM_AMD__) || defined(__HIPCC__)
false
#else
true
#endif
);
}
}
}
Expand Down
70 changes: 60 additions & 10 deletions ep/src/proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ LocalBarrier* map_local_barrier_shm(std::string const& name, bool* out_owner) {
perror("shm_open(existing)");
return nullptr;
}
struct stat st {};
struct stat st{};
int tries = 1000;
while (tries-- > 0) {
if (fstat(fd, &st) == 0 && static_cast<size_t>(st.st_size) >= kSize)
Expand Down Expand Up @@ -176,6 +176,32 @@ void Proxy::init_common() {
cfg_.thread_idx, cfg_.local_rank);
pin_thread_to_numa_wrapper();
if (!ctx_.cq) ctx_.cq = create_per_thread_cq(ctx_);

// Register atomic_buffer_ptr as a separate RDMA memory region if it was set
// This must be done after PD is initialized by per_thread_rdma_init
if (atomic_buffer_ptr_ && !ctx_.atomic_buffer_mr) {
ctx_.atomic_buffer_mr =
ibv_reg_mr(ctx_.pd, atomic_buffer_ptr_, kAtomicBufferSize,
IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE |
#ifdef EFA
IBV_ACCESS_REMOTE_READ
#else
IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_ATOMIC
#endif
);

if (!ctx_.atomic_buffer_mr) {
perror("Failed to register atomic_buffer_ptr MR");
std::abort();
}

fprintf(stderr,
"[Proxy] Registered atomic_buffer_ptr MR: addr=0x%llx, len=%zu, "
"rkey=0x%x\n",
(unsigned long long)ctx_.atomic_buffer_mr->addr,
(size_t)ctx_.atomic_buffer_mr->length, ctx_.atomic_buffer_mr->rkey);
}

if (ctxs_for_all_ranks_.empty()) {
fprintf(stderr,
"Error: peers metadata not set before init_common (peers_.size() "
Expand All @@ -195,6 +221,14 @@ void Proxy::init_common() {
ctx_.atomic_old_values_buf =
reinterpret_cast<uint32_t*>(static_cast<uint8_t*>(cfg_.gpu_buffer) +
cfg_.total_size - atomic_buf_size);
// Check alignment - only abort if not aligned (8-byte alignment required for
// 64-bit atomics)
if ((reinterpret_cast<uintptr_t>(ctx_.atomic_old_values_buf) & 0x7) != 0) {
fprintf(stderr, "Atomic buffer not 8-byte aligned: 0x%llx\n",
(unsigned long long)reinterpret_cast<uintptr_t>(
ctx_.atomic_old_values_buf));
std::abort();
}

int num_ranks = ctxs_for_all_ranks_.size();
local_infos_.assign(num_ranks, RDMAConnectionInfo{});
Expand All @@ -218,6 +252,8 @@ void Proxy::init_common() {
// NOTE(MaoZiming): each context can share the same cq, pd, mr.
// but the qp must be different.
c.cq = ctx_.cq;
// Share the atomic buffer MR with peer contexts
c.atomic_buffer_mr = ctx_.atomic_buffer_mr;

if (peer == my_rank) continue;
// Skip rdma connection for intra-node.
Expand All @@ -226,7 +262,7 @@ void Proxy::init_common() {
continue;
create_per_thread_qp(c, cfg_.gpu_buffer, cfg_.total_size,
&local_infos_[peer], my_rank, cfg_.d2h_queues.size(),
cfg_.use_normal_mode);
cfg_.use_normal_mode, atomic_buffer_ptr_);
modify_qp_to_init(c);
}

Expand Down Expand Up @@ -291,12 +327,24 @@ void Proxy::init_common() {
c.remote_addr = remote_infos_[peer].addr;
c.remote_rkey = remote_infos_[peer].rkey;
c.remote_len = remote_infos_[peer].len;
if (FILE* f = fopen("/tmp/uccl_debug.txt", "a")) {

// Set remote atomic buffer info from exchanged connection info
c.remote_atomic_buffer_addr = remote_infos_[peer].atomic_buffer_addr;
c.remote_atomic_buffer_len = remote_infos_[peer].atomic_buffer_len;
c.remote_atomic_buffer_rkey = remote_infos_[peer].atomic_buffer_rkey;

if (c.remote_atomic_buffer_addr == 0) {
fprintf(
f,
"[PROXY_INIT] me=%d peer=%d: remote_addr=0x%lx local_buffer=0x%lx\n",
my_rank, peer, c.remote_addr, (uintptr_t)cfg_.gpu_buffer);
fclose(f);
stderr,
"[Proxy] WARNING: Remote atomic buffer not registered for peer %d "
"(local atomic_buffer_ptr=%p, local atomic_buffer_mr=%p)\n",
peer, atomic_buffer_ptr_, (void*)ctx_.atomic_buffer_mr);
} else {
fprintf(stderr,
"[Proxy] Remote atomic buffer info for peer %d: addr=0x%llx, "
"len=%zu, rkey=0x%x\n",
peer, (unsigned long long)c.remote_atomic_buffer_addr,
(size_t)c.remote_atomic_buffer_len, c.remote_atomic_buffer_rkey);
}
}
usleep(50 * 1000);
Expand Down Expand Up @@ -456,7 +504,7 @@ void Proxy::run_dual() {
void Proxy::notify_gpu_completion(uint64_t& my_tail) {
if (acked_wrs_.empty()) return;

// Mark all acked command slots in each ring's bitmask
// Mark all acked command slots in each ring's bitmask
#ifdef USE_MSCCLPP_FIFO_BACKEND
// FIFO path: pop in order using the pending deque and the completion set.
for (size_t rb_idx = 0; rb_idx < cfg_.d2h_queues.size(); ++rb_idx) {
Expand Down Expand Up @@ -1088,7 +1136,7 @@ void Proxy::send_barrier(uint64_t wr) {
#endif
assert(ctx_.barrier_wr == -1 && "barrier_wr should be 0");
ctx_.barrier_wr = wr;
ctx_.barrier_seq = ctx_.barrier_seq + 1;
ctx_.barrier_seq = (ctx_.barrier_seq + 1) & BarrierImm::kSeqMask;

if (cfg_.rank == ctx_.node_leader_rank) {
if (ctx_.barrier_arrived.size() != static_cast<size_t>(cfg_.num_nodes)) {
Expand Down Expand Up @@ -1166,7 +1214,9 @@ void Proxy::barrier_check() {
}

// When global release comes back (CQ handler should set these):
if (ctx_.barrier_released && ctx_.barrier_release_seq == seq) {
// NOTE: BarrierImm is 21 bits, so we must mask the local seq.
if (ctx_.barrier_released &&
ctx_.barrier_release_seq == seq) {
// Reset local mask for next barrier and consume the global release
ctx_.barrier_released = false;

Expand Down
Loading
Loading