diff --git a/docker/Dockerfile.rocm b/docker/Dockerfile.rocm index 6114cf49a..d08028f80 100644 --- a/docker/Dockerfile.rocm +++ b/docker/Dockerfile.rocm @@ -40,7 +40,7 @@ RUN apt-get update && \ RUN python${PY_VER} -m pip install --no-cache-dir build auditwheel pybind11 RUN python${PY_VER} -m pip install --no-cache-dir --pre torch torchvision \ - --index-url https://download.pytorch.org/whl/nightly/rocm7.0 + --index-url https://download.pytorch.org/whl/nightly/rocm7.1 RUN python${PY_VER} -m pip install --no-cache-dir --upgrade setuptools diff --git a/ep/bench/utils.py b/ep/bench/utils.py index 732c2d150..26c493212 100644 --- a/ep/bench/utils.py +++ b/ep/bench/utils.py @@ -584,6 +584,18 @@ def initialize_uccl( ep.register_proxies(local_rank, proxies) + # Set atomic buffer pointer for all proxies BEFORE starting them + # This ensures the atomic buffer info is included in connection info exchange + # Note: Only thread 0's proxy allocates the atomic buffer in its constructor + if not is_intranode and len(proxies) > 0: + # Get atomic buffer pointer from thread 0 proxy (only thread 0 allocates it) + # This must be done before start_dual() so the atomic buffer info is included + # in the connection info exchange during init_common() + atomic_buffer_ptr = proxies[0].get_atomic_buffer_ptr() + if atomic_buffer_ptr: + for proxy in proxies: + proxy.set_atomic_buffer_ptr(atomic_buffer_ptr) + dist.barrier(group) if not is_intranode: for proxy in proxies: diff --git a/ep/include/common.hpp b/ep/include/common.hpp index 25c6ee08a..df40af027 100644 --- a/ep/include/common.hpp +++ b/ep/include/common.hpp @@ -12,6 +12,7 @@ #include #include +// #define SOFTWARE_ORDERING #define MAX_IB_DEVS 32 // #define MEASURE_PER_OP_LATENCY // #define MEASURE_PER_VERB_LATENCY diff --git a/ep/include/proxy_ctx.hpp b/ep/include/proxy_ctx.hpp index f87f70ce0..d88c46ac0 100644 --- a/ep/include/proxy_ctx.hpp +++ b/ep/include/proxy_ctx.hpp @@ -59,6 +59,12 @@ struct ProxyCtx { uint32_t remote_rkey = 0; uint32_t rkey = 0; + // Atomic buffer (separate from main RDMA buffer) + ibv_mr* atomic_buffer_mr = nullptr; // MR for local atomic_buffer_ptr + uintptr_t remote_atomic_buffer_addr = 0; // Remote atomic_buffer_ptr address + uint64_t remote_atomic_buffer_len = 0; // Remote atomic_buffer_ptr length + uint32_t remote_atomic_buffer_rkey = 0; // Remote atomic_buffer_ptr rkey + // Buffer offset within rdma_buffer for address translation uintptr_t dispatch_recv_data_offset = 0; // offset of dispatch_rdma_recv_data_buffer from rdma_buffer base diff --git a/ep/include/rdma.hpp b/ep/include/rdma.hpp index 76ddc0626..6f229d2aa 100644 --- a/ep/include/rdma.hpp +++ b/ep/include/rdma.hpp @@ -26,6 +26,11 @@ struct RDMAConnectionInfo { uint16_t lid; // Local ID uint8_t gid[16]; // Global ID for RoCE (optional) + // Atomic buffer info (separate from main GPU buffer) + uint32_t atomic_buffer_rkey = 0; // Atomic buffer memory region key + uintptr_t atomic_buffer_addr = 0; // Atomic buffer address + uint64_t atomic_buffer_len = 0; // Atomic buffer length + // #ifdef EFA uint32_t num_rings; uint32_t data_qp_num[kChannelPerProxy]; @@ -283,13 +288,14 @@ struct BarrierImm { // [28:8]=SEQ (21 bits), [7:0]=SRC_RANK static constexpr uint32_t kCtrlBit = 1u << 30; static constexpr uint32_t kAckBit = 1u << 29; + static constexpr uint32_t kSeqMask = 0x1FFFFFu; static inline bool IsAck(uint32_t imm) { return (imm & kAckBit) != 0u; } static inline uint32_t Pack(bool ack, uint32_t seq, uint8_t src_rank) { return kCtrlBit | (ack ? kAckBit : 0u) | - ((seq & 0x1FFFFFu) << 8) // 21 bits for seq + ((seq & kSeqMask) << 8) // 21 bits for seq | uint32_t(src_rank); } - static inline uint32_t Seq(uint32_t imm) { return (imm >> 8) & 0x1FFFFFu; } + static inline uint32_t Seq(uint32_t imm) { return (imm >> 8) & kSeqMask; } static inline uint8_t Rank(uint32_t imm) { return imm & 0xFFu; } explicit BarrierImm(uint32_t imm = 0) : value(imm) {} bool GetIsAck() const { return IsAck(value); } @@ -329,7 +335,8 @@ void remote_process_completions( int my_rank, int num_nodes, bool use_normal_mode = false); void create_per_thread_qp(ProxyCtx& S, void* gpu_buffer, size_t size, RDMAConnectionInfo* local_info, int rank, - size_t num_rings, bool use_normal_mode); + size_t num_rings, bool use_normal_mode, + void* atomic_buffer_ptr = nullptr); ibv_cq* create_per_thread_cq(ProxyCtx& S); void remote_poll_completions(ProxyCtx& S, int idx, CopyRingBuffer& g_ring, std::vector& ctx_by_tag, diff --git a/ep/src/internode.cu b/ep/src/internode.cu index 2388bd387..9c4fa7fc3 100644 --- a/ep/src/internode.cu +++ b/ep/src/internode.cu @@ -720,7 +720,7 @@ __global__ void __launch_bounds__( // Read RDMA rank existence uint64_t is_token_in_rank_uint64 = 0; if (lane_id < kNumRDMARanks) { - is_token_in_rank_uint64 = __ldg(reinterpret_cast( + is_token_in_rank_uint64 = *(reinterpret_cast( is_token_in_rank + token_idx * num_ranks + lane_id * NUM_MAX_NVL_PEERS)); } @@ -1025,10 +1025,20 @@ __global__ void __launch_bounds__( num_bytes_per_msg, translate_dst_rdma_rank(dst_rdma_rank, nvl_rank), channel_id, // NOTE(MaoZiming): use channel_id for rb. - lane_id, 0, d2h_channel_addrs, num_d2h_channel_addrs, false, -1, + lane_id, 0, d2h_channel_addrs, num_d2h_channel_addrs, false, + // NOTE(MaoZiming): for AMD GPUs, we directly send a subsequent RDMA + // to update the tail. For other GPUs and EFA NICs, we use the + // CPU-emulated atomics, allow us to piggyback the atomic operation + // with the RDMA send. +#if defined(__HIP_PLATFORM_AMD__) || defined(__HIPCC__) + -1, +#else + -1, reinterpret_cast(rdma_channel_tail.buffer(rdma_rank)) - reinterpret_cast(original_atomic_buffer_ptr), - num_tokens_to_issue); + num_tokens_to_issue +#endif + ); } else { // Lighter fence for local RDMA rank memory_fence(); @@ -1046,7 +1056,13 @@ __global__ void __launch_bounds__( translate_dst_rdma_rank(dst_rdma_rank, nvl_rank), channel_id, // NOTE(MaoZiming): use channel_id for rb. dst_rdma_rank == rdma_rank, d2h_channel_addrs, - num_d2h_channel_addrs, false, -1, true); + num_d2h_channel_addrs, false, -1, +#if defined(__HIP_PLATFORM_AMD__) || defined(__HIPCC__) + false +#else + true +#endif + ); } __syncwarp(); } @@ -1178,9 +1194,6 @@ __global__ void __launch_bounds__( trap(); } } -#if defined(__HIP_PLATFORM_AMD__) || defined(__HIPCC__) - memory_fence(); -#endif auto src_rdma_head = __shfl_sync(WARP_MASK, cached_rdma_channel_head, src_rdma_rank); auto src_rdma_tail = @@ -1249,13 +1262,8 @@ __global__ void __launch_bounds__( // Move tail index __syncwarp(); if (lane_id == 0) -#if defined(__HIP_PLATFORM_AMD__) || defined(__HIPCC__) - __atomic_store_n(nvl_channel_tail.buffer(), cached_nvl_channel_tail, - __ATOMIC_RELEASE); -#else st_release_sys_global(nvl_channel_tail.buffer(), cached_nvl_channel_tail); -#endif } // Retired __syncwarp(); @@ -2611,10 +2619,14 @@ __global__ void __launch_bounds__((kNumForwarders + 1) * WARP_SIZE, 1) nvl_rank), channel_id, // NOTE(MaoZiming): use channel_id for rb. lane_id, 0, d2h_channel_addrs, num_d2h_channel_addrs, false, -1, +#if defined(__HIP_PLATFORM_AMD__) || defined(__HIPCC__) +#else reinterpret_cast( rdma_channel_tail.buffer(rdma_rank)) - reinterpret_cast(original_atomic_buffer_ptr), - num_chunked_tokens); + num_chunked_tokens +#endif + ); } else { memory_fence(); } @@ -2630,7 +2642,13 @@ __global__ void __launch_bounds__((kNumForwarders + 1) * WARP_SIZE, 1) nvl_rank), channel_id, // NOTE(MaoZiming): use warp_id for rb. dst_rdma_rank == rdma_rank, d2h_channel_addrs, - num_d2h_channel_addrs, false, -1, true); + num_d2h_channel_addrs, false, -1, +#if defined(__HIP_PLATFORM_AMD__) || defined(__HIPCC__) + false +#else + true +#endif + ); } } } diff --git a/ep/src/proxy.cpp b/ep/src/proxy.cpp index 7248f030c..ee52ab5d2 100644 --- a/ep/src/proxy.cpp +++ b/ep/src/proxy.cpp @@ -45,7 +45,7 @@ LocalBarrier* map_local_barrier_shm(std::string const& name, bool* out_owner) { perror("shm_open(existing)"); return nullptr; } - struct stat st {}; + struct stat st{}; int tries = 1000; while (tries-- > 0) { if (fstat(fd, &st) == 0 && static_cast(st.st_size) >= kSize) @@ -176,6 +176,32 @@ void Proxy::init_common() { cfg_.thread_idx, cfg_.local_rank); pin_thread_to_numa_wrapper(); if (!ctx_.cq) ctx_.cq = create_per_thread_cq(ctx_); + + // Register atomic_buffer_ptr as a separate RDMA memory region if it was set + // This must be done after PD is initialized by per_thread_rdma_init + if (atomic_buffer_ptr_ && !ctx_.atomic_buffer_mr) { + ctx_.atomic_buffer_mr = + ibv_reg_mr(ctx_.pd, atomic_buffer_ptr_, kAtomicBufferSize, + IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE | +#ifdef EFA + IBV_ACCESS_REMOTE_READ +#else + IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_ATOMIC +#endif + ); + + if (!ctx_.atomic_buffer_mr) { + perror("Failed to register atomic_buffer_ptr MR"); + std::abort(); + } + + fprintf(stderr, + "[Proxy] Registered atomic_buffer_ptr MR: addr=0x%llx, len=%zu, " + "rkey=0x%x\n", + (unsigned long long)ctx_.atomic_buffer_mr->addr, + (size_t)ctx_.atomic_buffer_mr->length, ctx_.atomic_buffer_mr->rkey); + } + if (ctxs_for_all_ranks_.empty()) { fprintf(stderr, "Error: peers metadata not set before init_common (peers_.size() " @@ -195,6 +221,14 @@ void Proxy::init_common() { ctx_.atomic_old_values_buf = reinterpret_cast(static_cast(cfg_.gpu_buffer) + cfg_.total_size - atomic_buf_size); + // Check alignment - only abort if not aligned (8-byte alignment required for + // 64-bit atomics) + if ((reinterpret_cast(ctx_.atomic_old_values_buf) & 0x7) != 0) { + fprintf(stderr, "Atomic buffer not 8-byte aligned: 0x%llx\n", + (unsigned long long)reinterpret_cast( + ctx_.atomic_old_values_buf)); + std::abort(); + } int num_ranks = ctxs_for_all_ranks_.size(); local_infos_.assign(num_ranks, RDMAConnectionInfo{}); @@ -218,6 +252,8 @@ void Proxy::init_common() { // NOTE(MaoZiming): each context can share the same cq, pd, mr. // but the qp must be different. c.cq = ctx_.cq; + // Share the atomic buffer MR with peer contexts + c.atomic_buffer_mr = ctx_.atomic_buffer_mr; if (peer == my_rank) continue; // Skip rdma connection for intra-node. @@ -226,7 +262,7 @@ void Proxy::init_common() { continue; create_per_thread_qp(c, cfg_.gpu_buffer, cfg_.total_size, &local_infos_[peer], my_rank, cfg_.d2h_queues.size(), - cfg_.use_normal_mode); + cfg_.use_normal_mode, atomic_buffer_ptr_); modify_qp_to_init(c); } @@ -291,12 +327,24 @@ void Proxy::init_common() { c.remote_addr = remote_infos_[peer].addr; c.remote_rkey = remote_infos_[peer].rkey; c.remote_len = remote_infos_[peer].len; - if (FILE* f = fopen("/tmp/uccl_debug.txt", "a")) { + + // Set remote atomic buffer info from exchanged connection info + c.remote_atomic_buffer_addr = remote_infos_[peer].atomic_buffer_addr; + c.remote_atomic_buffer_len = remote_infos_[peer].atomic_buffer_len; + c.remote_atomic_buffer_rkey = remote_infos_[peer].atomic_buffer_rkey; + + if (c.remote_atomic_buffer_addr == 0) { fprintf( - f, - "[PROXY_INIT] me=%d peer=%d: remote_addr=0x%lx local_buffer=0x%lx\n", - my_rank, peer, c.remote_addr, (uintptr_t)cfg_.gpu_buffer); - fclose(f); + stderr, + "[Proxy] WARNING: Remote atomic buffer not registered for peer %d " + "(local atomic_buffer_ptr=%p, local atomic_buffer_mr=%p)\n", + peer, atomic_buffer_ptr_, (void*)ctx_.atomic_buffer_mr); + } else { + fprintf(stderr, + "[Proxy] Remote atomic buffer info for peer %d: addr=0x%llx, " + "len=%zu, rkey=0x%x\n", + peer, (unsigned long long)c.remote_atomic_buffer_addr, + (size_t)c.remote_atomic_buffer_len, c.remote_atomic_buffer_rkey); } } usleep(50 * 1000); @@ -456,7 +504,7 @@ void Proxy::run_dual() { void Proxy::notify_gpu_completion(uint64_t& my_tail) { if (acked_wrs_.empty()) return; - // Mark all acked command slots in each ring's bitmask + // Mark all acked command slots in each ring's bitmask #ifdef USE_MSCCLPP_FIFO_BACKEND // FIFO path: pop in order using the pending deque and the completion set. for (size_t rb_idx = 0; rb_idx < cfg_.d2h_queues.size(); ++rb_idx) { @@ -1088,7 +1136,7 @@ void Proxy::send_barrier(uint64_t wr) { #endif assert(ctx_.barrier_wr == -1 && "barrier_wr should be 0"); ctx_.barrier_wr = wr; - ctx_.barrier_seq = ctx_.barrier_seq + 1; + ctx_.barrier_seq = (ctx_.barrier_seq + 1) & BarrierImm::kSeqMask; if (cfg_.rank == ctx_.node_leader_rank) { if (ctx_.barrier_arrived.size() != static_cast(cfg_.num_nodes)) { @@ -1166,7 +1214,9 @@ void Proxy::barrier_check() { } // When global release comes back (CQ handler should set these): - if (ctx_.barrier_released && ctx_.barrier_release_seq == seq) { + // NOTE: BarrierImm is 21 bits, so we must mask the local seq. + if (ctx_.barrier_released && + ctx_.barrier_release_seq == seq) { // Reset local mask for next barrier and consume the global release ctx_.barrier_released = false; diff --git a/ep/src/rdma.cpp b/ep/src/rdma.cpp index 78a66c220..1f9ee21b1 100644 --- a/ep/src/rdma.cpp +++ b/ep/src/rdma.cpp @@ -229,12 +229,11 @@ void per_thread_rdma_init(ProxyCtx& S, void* gpu_buf, size_t bytes, int rank, #ifndef EFA S.mr = ibv_reg_mr_iova2(S.pd, gpu_buf, bytes, iova, IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE | - IBV_ACCESS_REMOTE_ATOMIC | - IBV_ACCESS_RELAXED_ORDERING); + IBV_ACCESS_REMOTE_ATOMIC); #else S.mr = ibv_reg_mr_iova2(S.pd, gpu_buf, bytes, iova, - IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE | - IBV_ACCESS_RELAXED_ORDERING); + IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE | + IBV_ACCESS_RELAXED_ORDERING); #endif if (!S.mr) { @@ -352,7 +351,8 @@ struct ibv_qp* create_srd_qp_ex(ProxyCtx& S) { void create_per_thread_qp(ProxyCtx& S, void* gpu_buffer, size_t size, RDMAConnectionInfo* local_info, int rank, - size_t num_rings, bool use_normal_mode) { + size_t num_rings, bool use_normal_mode, + void* atomic_buffer_ptr) { if (S.qp) return; // Already initialized for this thread if (S.ack_qp) return; if (S.recv_ack_qp) return; @@ -430,6 +430,30 @@ void create_per_thread_qp(ProxyCtx& S, void* gpu_buffer, size_t size, local_info->len = size; local_info->psn = 0; local_info->ack_psn = 0; + + // Populate atomic buffer info if available + // Use S.atomic_buffer_mr if it exists (even if atomic_buffer_ptr is nullptr + // for this thread) This ensures all threads exchange the same atomic buffer + // info + if (S.atomic_buffer_mr) { +#ifdef EFA + assert(false && "This path should not happen for EFA"); +#endif + local_info->atomic_buffer_rkey = S.atomic_buffer_mr->rkey; + local_info->atomic_buffer_addr = + reinterpret_cast(S.atomic_buffer_mr->addr); + local_info->atomic_buffer_len = S.atomic_buffer_mr->length; + fprintf(stderr, + "[create_per_thread_qp] Populated atomic buffer info: addr=0x%llx, " + "len=%zu, rkey=0x%x\n", + (unsigned long long)local_info->atomic_buffer_addr, + (size_t)local_info->atomic_buffer_len, + local_info->atomic_buffer_rkey); + } else { + // TODO(MaoZiming): Only for non-EFA case. + assert(false && "Atomic buffer is not registered"); + } + fill_local_gid(S, local_info); } @@ -849,6 +873,126 @@ static void post_rdma_async_batched_normal_mode( dst_rank, strerror(ret), ret); std::abort(); } +#elif defined(SOFTWARE_ORDERING) + { + size_t const local_ring_count = ctx->data_qps_by_channel.size(); + struct ibv_qp* qp = + local_ring_count + ? ctx->data_qps_by_channel[ring_idx_raw % local_ring_count] + : ctx->ack_qp; + + size_t const kgroup = idxs.size(); + std::vector sges(kgroup); + std::vector wrs(kgroup); + std::vector ring_wrids; + ring_wrids.reserve(kgroup); + + for (size_t j = 0; j < kgroup; ++j) { + size_t i = idxs[j]; + auto const& cmd = cmds_to_post[i]; + ring_wrids.push_back(wrs_to_post[i]); + + // Remote address bounds check + uint64_t remote_addr = + ctx->remote_addr + (cmd.req_rptr ? cmd.req_rptr : 0); + uint64_t remote_end = ctx->remote_addr + ctx->remote_len; + + if (remote_addr < ctx->remote_addr || + remote_addr + cmd.bytes > remote_end) { + fprintf( + stderr, + "[ERROR] Remote write OOB: addr=0x%llx len=%u (base=0x%llx, " + "size=%zu), cmd.req_rptr: 0x%llx\n", + (unsigned long long)remote_addr, cmd.bytes, + (unsigned long long)ctx->remote_addr, (size_t)ctx->remote_len, + (unsigned long long)cmd.req_rptr); + cudaError_t err = cudaDeviceSynchronize(); + if (err != cudaSuccess) { + fprintf(stderr, "cudaDeviceSynchronize failed: %s\n", + cudaGetErrorString(err)); + } + std::abort(); + } + + // Local SGE + uintptr_t laddr = + cmd.req_lptr + reinterpret_cast(ctx->mr->addr); + sges[j] = { + .addr = laddr, + .length = static_cast(cmd.bytes), + .lkey = ctx->mr->lkey, + }; + + // Build WR + std::memset(&wrs[j], 0, sizeof(wrs[j])); + wrs[j].wr_id = wrs_to_post[i]; + wrs[j].sg_list = &sges[j]; + wrs[j].num_sge = 1; + wrs[j].wr.rdma.remote_addr = remote_addr; + wrs[j].wr.rdma.rkey = ctx->remote_rkey; + wrs[j].opcode = IBV_WR_RDMA_WRITE; // default + wrs[j].send_flags = (j + 1 == kgroup) ? IBV_SEND_SIGNALED : 0; + wrs[j].next = (j + 1 < kgroup) ? &wrs[j + 1] : nullptr; + + if (cmd.atomic_offset > 0 && cmd.atomic_val > 0) { + int v = static_cast(cmd.atomic_val); + if (v < -kMaxSendAtomicValue || v > kMaxSendAtomicValue) { + fprintf(stderr, "atomic value=%d won't fit in 15 bits\n", v); + std::abort(); + } + size_t index = static_cast(cmd.atomic_offset / sizeof(int)); + // Initialize missing entries lazily + auto key = ctx->seq_key(dst_rank, index); + if (ctx->next_seq_per_index.find(key) == + ctx->next_seq_per_index.end()) + ctx->next_seq_per_index[key] = 0; + + uint8_t seq = ctx->next_seq_per_index[key]; + ctx->next_seq_per_index[key] = + (seq + 1) % kReorderingBufferSize; // 4-bit wrap (0–15) + uint32_t imm = + AtomicsImm::PackAtomicWithSeq(v, cmd.atomic_offset, seq, true) + .GetImmData(); + AtomicsImm aimm(imm); + assert(aimm.GetSeq() == seq); + + wrs[j].opcode = IBV_WR_RDMA_WRITE_WITH_IMM; + wrs[j].imm_data = htonl(imm); + + assert(aimm.GetValue() == cmd.atomic_val); + assert(aimm.GetOff() == cmd.atomic_offset); + } else { + wrs[j].opcode = IBV_WR_RDMA_WRITE; + } + } + + // Post the chain + ibv_send_wr* bad = nullptr; + int ret = ibv_post_send(qp, &wrs[0], &bad); + if (ret) { + fprintf(stderr, "ibv_post_send failed (dst=%d): %s (ret=%d)\n", + dst_rank, strerror(ret), ret); + if (bad) + fprintf(stderr, "Bad WR at %p (wr_id=%lu)\n", (void*)bad, + bad->wr_id); + std::abort(); + } + + // Track wr_id mappings for SOFTWARE_ORDERING + size_t const last = kgroup - 1; + uint64_t const batch_tail_wr = ring_wrids[last]; + { + auto [it, inserted] = S.wr_id_to_wr_ids.try_emplace( + batch_tail_wr, std::move(ring_wrids)); + if (!inserted) { + fprintf(stderr, + "thread_idx: %d, Error: tail wr_id %lu already exists " + "(map=%p)\n", + thread_idx, batch_tail_wr, (void*)&S.wr_id_to_wr_ids); + std::abort(); + } + } + } #else { size_t const local_ring_count = ctx->data_qps_by_channel.size(); @@ -1284,9 +1428,18 @@ void local_process_completions(ProxyCtx& S, break; case IBV_WC_FETCH_ADD: { uint64_t wrid = wc[i].wr_id; - printf("Local thread %d: atomic completed (wr_id=0x%lx)\n", thread_idx, - wrid); - assert(false && "Atomic not expected on local proxy"); + auto it = S.wr_id_to_wr_ids.find(wrid); + if (it != S.wr_id_to_wr_ids.end()) { + for (uint64_t sub_wr : it->second) { + acked_wrs.insert(sub_wr); + } + S.wr_id_to_wr_ids.erase(it); + } else { + fprintf(stderr, + "[Atomic] No batch found for wr_id=0x%lx, treating as single " + "(map_size=%zu)\n", + wrid, S.wr_id_to_wr_ids.size()); + } } break; default: break; @@ -1423,7 +1576,6 @@ void remote_process_completions_normal_mode( int value = aimm.GetValue(); uint32_t offset = aimm.GetOff(); size_t index = offset / sizeof(int); - auto* addr32 = reinterpret_cast*>(atomic_buffer_ptr) + index; @@ -1432,6 +1584,10 @@ void remote_process_completions_normal_mode( if (!aimm.IsReorderable()) { addr32->fetch_add(value, std::memory_order_release); } else { +#ifndef EFA + assert(false && + "Reorderable atomic operations should not be triggered"); +#endif struct SeqBuf { uint8_t expected = 0; // next seq expected uint16_t present_mask = 0; // bitmask of buffered seqs @@ -2065,7 +2221,7 @@ static void post_atomic_operations_normal_mode( group_wrids.push_back(wr_id); int v = static_cast(cmd.value); - if (v > kLargeAtomicValue) v = kMaxSendAtomicValue; // saturate for imm + if (v > kLargeAtomicValue) v = kMaxSendAtomicValue; if (v < -kMaxSendAtomicValue || v > kMaxSendAtomicValue) { fprintf(stderr, "value=%d (cmd.value=%lu) won't fit in 15 bits for imm; " @@ -2277,6 +2433,213 @@ static void post_atomic_operations_fast_mode( } } +// Native RDMA implementation (non-EFA) +static void post_atomic_operations_native_rdma( + ProxyCtx& S, std::vector const& wrs_to_post, + std::vector const& cmds_to_post, + std::vector>& ctxs, int my_rank, int thread_idx, + std::unordered_set& acked_wrs) { + if (cmds_to_post.size() > ProxyCtx::kMaxAtomicOps) { + fprintf(stderr, "Too many atomic operations: %zu > %zu\n", + cmds_to_post.size(), ProxyCtx::kMaxAtomicOps); + std::abort(); + } + + std::unordered_map> dst_rank_wr_ids; + dst_rank_wr_ids.reserve(cmds_to_post.size()); + for (size_t i = 0; i < wrs_to_post.size(); ++i) { + int dst = static_cast(cmds_to_post[i].dst_rank); + if (dst == my_rank) { + fprintf(stderr, "Posting atomic to itself\n"); + std::abort(); + } + dst_rank_wr_ids[dst].push_back(i); + } + + for (auto& [dst_rank, wr_ids] : dst_rank_wr_ids) { + if (wr_ids.empty()) continue; + + ProxyCtx* ctx = ctxs[dst_rank].get(); + size_t const k = wr_ids.size(); + // Group by ring index (upper 32 bits in wrs_to_post) + std::unordered_map> ring_to_indices; + ring_to_indices.reserve(k); + for (size_t ii = 0; ii < k; ++ii) { + size_t global_i = wr_ids[ii]; + size_t ring_idx = + static_cast((wrs_to_post[global_i] >> 32) & 0xFFFFFFFFu); + ring_to_indices[ring_idx].push_back(global_i); + } + + for (auto& [ring_idx_raw, idxs] : ring_to_indices) { + size_t const local_ring_count = ctx->data_qps_by_channel.size(); + struct ibv_qp* qp = + local_ring_count + ? ctx->data_qps_by_channel[ring_idx_raw % local_ring_count] + : ctx->ack_qp; + + size_t const k = idxs.size(); + std::vector sge(k); + std::vector wr(k); + std::vector group_wrids; + group_wrids.reserve(k); + + // Verify atomic buffer is properly aligned (8-byte for 64-bit atomics) + uintptr_t atomic_buf_addr = + reinterpret_cast(S.atomic_old_values_buf); + if ((atomic_buf_addr & 0x7) != 0) { + fprintf( + stderr, + "[Native RDMA] atomic_old_values_buf not 8-byte aligned: 0x%llx\n", + (unsigned long long)atomic_buf_addr); + std::abort(); + } + + // Verify atomic buffer is within registered memory region + uintptr_t mr_addr = reinterpret_cast(S.mr->addr); + uintptr_t mr_end = mr_addr + S.mr->length; + uintptr_t atomic_buf_end = atomic_buf_addr + k * sizeof(uint64_t); + if (atomic_buf_addr < mr_addr || atomic_buf_end > mr_end) { + fprintf(stderr, + "[Native RDMA] atomic buffer out of bounds: buf=0x%llx-0x%llx, " + "mr=0x%llx-0x%llx\n", + (unsigned long long)atomic_buf_addr, + (unsigned long long)atomic_buf_end, (unsigned long long)mr_addr, + (unsigned long long)mr_end); + std::abort(); + } + + uint64_t* atomic_old_values_64 = + reinterpret_cast(S.atomic_old_values_buf); + + for (size_t t = 0; t < k; ++t) { + size_t i = idxs[t]; + auto const& cmd = cmds_to_post[i]; + uint64_t const wr_id = wrs_to_post[i]; + group_wrids.push_back(wr_id); + + int v = static_cast(cmd.value); + if (v == kLargeAtomicValue) v = kMaxSendAtomicValue; + + // Convert 32-bit signed int to 64-bit for RDMA atomics + // IBV_WR_ATOMIC_FETCH_AND_ADD requires 64-bit operations + int64_t v64 = static_cast(static_cast(v)); + + // Calculate remote address - must be 8-byte aligned for 64-bit RDMA + // atomics cmd.req_rptr is an offset relative to atomic_base_addr (local + // atomic_buffer_ptr) Use the remote atomic buffer address if available, + // otherwise fall back to remote_addr + if ((cmd.req_rptr & 0x7) != 0) { + fprintf(stderr, "[Native RDMA] req_rptr not 8-byte aligned: 0x%x\n", + cmd.req_rptr); + std::abort(); + } + uint64_t remote_atomic_addr; + if (ctx->remote_atomic_buffer_addr != 0) { + // Use registered atomic buffer + remote_atomic_addr = ctx->remote_atomic_buffer_addr + cmd.req_rptr; + } else { + assert(false && "Atomic buffer is not registered"); + } + + // Verify final address alignment (should always be true if req_rptr is + // aligned) + assert((remote_atomic_addr & 0x7) == 0 && + "Remote atomic address must be 8-byte aligned"); + + // Verify remote address is within bounds of the atomic buffer + if (ctx->remote_atomic_buffer_addr == 0) { + fprintf(stderr, + "[Native RDMA] Remote atomic buffer not registered\n"); + std::abort(); + } + if (remote_atomic_addr < ctx->remote_atomic_buffer_addr || + remote_atomic_addr + sizeof(uint64_t) > + ctx->remote_atomic_buffer_addr + + ctx->remote_atomic_buffer_len) { + fprintf(stderr, + "[Native RDMA] Remote atomic address out of bounds: 0x%llx " + "(base=0x%llx, len=%zu)\n", + (unsigned long long)remote_atomic_addr, + (unsigned long long)ctx->remote_atomic_buffer_addr, + (size_t)ctx->remote_atomic_buffer_len); + std::abort(); + } + + // Local SGE: point to local buffer where old value will be stored + // (64-bit) Use local context S's memory region, not destination ctx + uintptr_t local_addr = + reinterpret_cast(&atomic_old_values_64[t]); + // Double-check address is within bounds + if (local_addr < mr_addr || local_addr + sizeof(uint64_t) > mr_end) { + fprintf(stderr, + "[Native RDMA] Local atomic address out of bounds: 0x%llx\n", + (unsigned long long)local_addr); + std::abort(); + } + + sge[t].addr = local_addr; + sge[t].length = sizeof(uint64_t); + sge[t].lkey = S.mr->lkey; + + std::memset(&wr[t], 0, sizeof(wr[t])); + wr[t].wr_id = wr_id; + wr[t].opcode = IBV_WR_ATOMIC_FETCH_AND_ADD; + wr[t].send_flags = (t + 1 == k) ? IBV_SEND_SIGNALED : 0; + wr[t].sg_list = &sge[t]; + wr[t].num_sge = 1; + wr[t].wr.atomic.remote_addr = remote_atomic_addr; + // Use remote atomic buffer rkey if available, otherwise use main buffer + // rkey + assert(ctx->remote_atomic_buffer_rkey != 0); + wr[t].wr.atomic.rkey = ctx->remote_atomic_buffer_rkey; + wr[t].wr.atomic.compare_add = v64; // 64-bit value to add + wr[t].next = (t + 1 < k) ? &wr[t + 1] : nullptr; + } + + ibv_send_wr* bad = nullptr; + int ret = ibv_post_send(qp, &wr[0], &bad); + if (ret) { + fprintf(stderr, "[Native RDMA] ibv_post_send(atomic) failed: %d (%s)\n", + ret, strerror(ret)); + if (bad) { + fprintf(stderr, " bad wr_id=0x%llx opcode=%u\n", + (unsigned long long)bad->wr_id, bad->opcode); + } + std::abort(); + } + // The completion will have wr_id = wr[k-1].wr_id (the last signaled WR) + // Store this exact value to ensure lookup matches + uint64_t const batch_tail_wr = wr[k - 1].wr_id; + // Verify this matches group_wrids.back() (should always be true) + if (batch_tail_wr != group_wrids.back()) { + fprintf( + stderr, + "[Native RDMA] ERROR: batch_tail_wr (0x%lx) != group_wrids.back() " + "(0x%lx)\n", + batch_tail_wr, group_wrids.back()); + std::abort(); + } + { + auto [it, inserted] = S.wr_id_to_wr_ids.try_emplace( + batch_tail_wr, std::move(group_wrids)); + + // printf("[Native RDMA] batch_tail_wr: 0x%lx, map_size: %zu, dst_rank: + // %d\n", batch_tail_wr, it->second.size(), dst_rank); + if (!inserted) { + fprintf(stderr, + "thread_idx: %d, Error: tail wr_id %lu already exists " + "(map=%p, " + "size=%zu, dst_rank=%d)\n", + thread_idx, batch_tail_wr, (void*)&S.wr_id_to_wr_ids, + S.wr_id_to_wr_ids.size(), dst_rank); + std::abort(); + } + } + } + } +} + // Wrapper that selects implementation based on use_normal_mode void post_atomic_operations(ProxyCtx& S, std::vector const& wrs_to_post, @@ -2286,8 +2649,13 @@ void post_atomic_operations(ProxyCtx& S, std::unordered_set& acked_wrs, bool use_normal_mode) { if (use_normal_mode) { +#if defined(__HIP_PLATFORM_AMD__) || defined(__HIPCC__) + post_atomic_operations_native_rdma(S, wrs_to_post, cmds_to_post, ctxs, + my_rank, thread_idx, acked_wrs); +#else post_atomic_operations_normal_mode(S, wrs_to_post, cmds_to_post, ctxs, my_rank, thread_idx, acked_wrs); +#endif } else { post_atomic_operations_fast_mode(S, wrs_to_post, cmds_to_post, ctxs, my_rank, thread_idx, acked_wrs); diff --git a/ep/src/uccl_ep.cc b/ep/src/uccl_ep.cc index 5ba63839e..2d07caf89 100644 --- a/ep/src/uccl_ep.cc +++ b/ep/src/uccl_ep.cc @@ -138,10 +138,6 @@ class Buffer { #endif } -#if defined(__HIP_PLATFORM_AMD__) || defined(__HIPCC__) - // Note(huangzhen): It will make d_handles turn to nullptr in rocm7.0, - // so we don't prefetch d_handles. -#else // Prefetch so the device immediately sees initialized contents CUDA_CHECK(cudaMemPrefetchAsync( d_handle_objs, num_d2h_channel_addrs * sizeof(d2hq::D2HHandle), @@ -150,7 +146,6 @@ class Buffer { d_handles, num_d2h_channel_addrs * sizeof(uint64_t), device_index)); CUDA_CHECK(cudaDeviceSynchronize()); -#endif } // Allocate device memory for IPC base pointers CUDA_CHECK( diff --git a/ep/src/uccl_proxy.cpp b/ep/src/uccl_proxy.cpp index 53bbd00fa..a87410386 100644 --- a/ep/src/uccl_proxy.cpp +++ b/ep/src/uccl_proxy.cpp @@ -58,9 +58,8 @@ UcclProxy::UcclProxy(int thread_idx, uintptr_t gpu_buffer_addr, #ifdef USE_GRACE_HOPPER cudaMallocManaged(&atomic_buffer_ptr_, kAtomicBufferSize); #elif defined(__HIP_PLATFORM_AMD__) || defined(__HIPCC__) - cudaHostAlloc(&atomic_buffer_ptr_, kAtomicBufferSize, - cudaHostAllocMapped | cudaHostAllocWriteCombined | - hipHostMallocUncached); + hipExtMallocWithFlags(&atomic_buffer_ptr_, kAtomicBufferSize, + hipDeviceMallocUncached); #else cudaHostAlloc(&atomic_buffer_ptr_, kAtomicBufferSize, cudaHostAllocMapped | cudaHostAllocWriteCombined);