Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
3340d74
ep debug mem consistency issues
MaoZiming Dec 23, 2025
ce7ab42
nit
MaoZiming Dec 23, 2025
4d37b7c
used rdma atomic + cudamalloc for atomic on AMD non-EFA
MaoZiming Dec 27, 2025
565051c
revert formatting python
MaoZiming Dec 27, 2025
f0f15c8
revert formatting other python
MaoZiming Dec 27, 2025
71062d0
Minor change
MaoZiming Dec 27, 2025
5ba543c
revert some changes to rdma.cpp
MaoZiming Dec 27, 2025
760ebdb
wrap post_atomic_operations_native_rdma under ifdef AMD
MaoZiming Dec 27, 2025
b7132e5
format
MaoZiming Dec 27, 2025
f77006e
Merge branch 'main' into ep-debug-amd-mem-consistency
MaoZiming Dec 27, 2025
47f4396
combine
MaoZiming Dec 28, 2025
fb1f535
remove relaxed ordering flag
MaoZiming Dec 30, 2025
aa23177
change atomic buffer cudaMalloc to hipExtMallocWithFlags with uncache…
zhenhuang12 Jan 2, 2026
702c922
debug
MaoZiming Jan 3, 2026
63b2203
efa stability
MaoZiming Jan 3, 2026
7aea584
Merge branch 'ep-efa-stability' into ep-debug-amd-mem-consistency-debug
MaoZiming Jan 3, 2026
ca8a9e8
use uint64_t
MaoZiming Jan 3, 2026
9f1bee7
mask seq
MaoZiming Jan 3, 2026
bfbadc1
fix
MaoZiming Jan 3, 2026
7f1da83
fix nits
MaoZiming Jan 3, 2026
a69de86
Merge branch 'ep-efa-stability' of https://github.com/uccl-project/uc…
MaoZiming Jan 3, 2026
46d8835
Merge branch 'ep-efa-stability' into ep-debug-amd-mem-consistency-debug
MaoZiming Jan 3, 2026
e8e4849
try out software ordering
MaoZiming Jan 4, 2026
9400843
quiet to one thread
MaoZiming Jan 4, 2026
f75c1e2
update software ordering
MaoZiming Jan 4, 2026
8813477
check
MaoZiming Jan 6, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ep/bench/test_internode.py
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ def test_loop(

assert num_local_ranks == 8 and num_ranks > 8

for seed in range(int(1e9)):
for seed in range(650, int(1e9)):
if local_rank == 0:
print(f"Testing with seed {seed} ...", flush=True)
torch.manual_seed(rank + seed)
Expand Down
12 changes: 12 additions & 0 deletions ep/bench/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,18 @@ def initialize_uccl(

ep.register_proxies(local_rank, proxies)

# Set atomic buffer pointer for all proxies BEFORE starting them
# This ensures the atomic buffer info is included in connection info exchange
# Note: Only thread 0's proxy allocates the atomic buffer in its constructor
if not is_intranode and len(proxies) > 0:
# Get atomic buffer pointer from thread 0 proxy (only thread 0 allocates it)
# This must be done before start_dual() so the atomic buffer info is included
# in the connection info exchange during init_common()
atomic_buffer_ptr = proxies[0].get_atomic_buffer_ptr()
if atomic_buffer_ptr:
for proxy in proxies:
proxy.set_atomic_buffer_ptr(atomic_buffer_ptr)

dist.barrier(group)
if not is_intranode:
for proxy in proxies:
Expand Down
4 changes: 2 additions & 2 deletions ep/include/barrier_local.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ struct LocalBarrier {
std::atomic<uint64_t> arrive_seq[UCCL_MAX_LOCAL_RANKS];
std::atomic<uint64_t> release_seq[UCCL_MAX_LOCAL_RANKS];
std::atomic<uint64_t> seq;
std::atomic<uint64_t> full_mask; // unchanged; still used for size/info
std::atomic<uint64_t> arrived_mask; // optional: keep only for debug prints
std::atomic<uint64_t> full_mask;
std::atomic<uint64_t> arrived_mask;
};
15 changes: 15 additions & 0 deletions ep/include/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <stdio.h>
#include <unistd.h>

// #define SOFTWARE_ORDERING
#define MAX_IB_DEVS 32
// #define MEASURE_PER_OP_LATENCY
// #define MEASURE_PER_VERB_LATENCY
Expand Down Expand Up @@ -61,6 +62,20 @@ extern bool use_ll_sl;
#define kBarrierWrTag 0xbaba000000000000ULL
#define kBarrierMask 0x0000FFFFFFFFFFFFULL
#define kPrintCycleInterval 100000000000ULL
#define kRingIdxBits 10
#define kRingIdxMask ((1ULL << kRingIdxBits) - 1ULL)
#define kRingIdxShift kRingIdxBits

// WR ID helpers for ring-indexed work requests:
// - lower kRingIdxBits bits: ring index
// - upper bits: sequence / command index
inline uint64_t make_ring_wr_id(uint64_t seq, uint64_t ring_idx) {
return (seq << kRingIdxBits) | (ring_idx & kRingIdxMask);
}
inline uint64_t ring_wr_seq(uint64_t wrid) { return wrid >> kRingIdxBits; }
inline uint32_t ring_wr_idx(uint64_t wrid) {
return static_cast<uint32_t>(wrid & kRingIdxMask);
}
#define MAX_RETRIES 100
#define RETRY_DELAY_MS 50
#define QKEY 0x11111111u
Expand Down
20 changes: 13 additions & 7 deletions ep/include/ep_utils.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -821,7 +821,7 @@ __forceinline__ __device__ int atomic_exch_cta_release(int* addr, int x) {
return ret;
}

template <int kNumRanks, bool kSyncOnly = false>
template <int kNumRanks, bool kSyncOnly = false, int label = 0>
__forceinline__ __device__ void barrier_block(int** barrier_signal_ptrs,
int rank) {
auto thread_id = static_cast<int>(threadIdx.x);
Expand Down Expand Up @@ -849,10 +849,17 @@ __forceinline__ __device__ void barrier_block(int** barrier_signal_ptrs,
if (__all_sync(WARP_MASK, value <= 0)) break;

if (clock64() - start_time > NUM_TIMEOUT_CYCLES and thread_id < kNumRanks) {
printf(
"DeepEP timeout check failed: rank = %d, thread = %d, value = "
"%d)\n",
rank, thread_id, value);
if (label == 0) {
printf(
"DeepEP timeout check failed: rank = %d, thread = %d, value = "
"%d)\n",
rank, thread_id, value);
} else {
printf(
"DeepEP timeout check failed: rank = %d, thread = %d, value = %d, "
"label = %d)\n",
rank, thread_id, value, label);
}
trap();
}
}
Expand Down Expand Up @@ -919,8 +926,7 @@ __device__ __forceinline__ int ld_acquire_cta(int const* ptr) {
__forceinline__ __device__ void acquire_lock(int* mutex) {
// To make later memory operations valid, we must use `acquire` for memory
// semantics
while (atomic_cas_cta_acquire(mutex, 0, 1) != 0)
;
while (atomic_cas_cta_acquire(mutex, 0, 1) != 0);
}

__forceinline__ __device__ void release_lock(int* mutex) {
Expand Down
2 changes: 1 addition & 1 deletion ep/include/proxy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class Proxy {
void init_remote();

void notify_gpu_completion(uint64_t& my_tail);
void post_gpu_command(uint64_t& my_tail, size_t& seen);
void post_gpu_command(uint64_t& my_tail, uint64_t& seen);
void post_gpu_commands_mixed(std::vector<uint64_t> const& wrs_to_post,
std::vector<TransferCmd> const& cmds_to_post);
void post_barrier_msg(int dst_rank, bool ack, uint64_t seq);
Expand Down
11 changes: 9 additions & 2 deletions ep/include/proxy_ctx.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ struct ProxyCtx {
uint32_t remote_rkey = 0;
uint32_t rkey = 0;

// Atomic buffer (separate from main RDMA buffer)
ibv_mr* atomic_buffer_mr = nullptr; // MR for local atomic_buffer_ptr
uintptr_t remote_atomic_buffer_addr = 0; // Remote atomic_buffer_ptr address
uint64_t remote_atomic_buffer_len = 0; // Remote atomic_buffer_ptr length
uint32_t remote_atomic_buffer_rkey = 0; // Remote atomic_buffer_ptr rkey

// Buffer offset within rdma_buffer for address translation
uintptr_t dispatch_recv_data_offset =
0; // offset of dispatch_rdma_recv_data_buffer from rdma_buffer base
Expand Down Expand Up @@ -102,7 +108,8 @@ struct ProxyCtx {

// Async-barrier state (single inflight assumed)
bool barrier_inflight = false;
uint64_t barrier_seq = 0;
// For debuigging only.
uint64_t barrier_seq = 1;
int barrier_wr = -1;

bool quiet_inflight = false;
Expand All @@ -124,7 +131,7 @@ struct ProxyCtx {
int local_rank = -1; // convenience mirror of cfg_.local_rank
int thread_idx = -1; // thread index used in shm name

std::unordered_map<uint64_t, uint8_t> next_seq_per_index;
std::unordered_map<uint64_t, uint64_t> next_seq_per_index;
inline uint64_t seq_key(int dst_rank, size_t index) {
// assumes dst_rank fits 32 bits; if index > 32 bits, prefer Pair Hash below
return (static_cast<uint64_t>(static_cast<uint32_t>(dst_rank)) << 32) ^
Expand Down
13 changes: 10 additions & 3 deletions ep/include/rdma.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ struct RDMAConnectionInfo {
uint16_t lid; // Local ID
uint8_t gid[16]; // Global ID for RoCE (optional)

// Atomic buffer info (separate from main GPU buffer)
uint32_t atomic_buffer_rkey = 0; // Atomic buffer memory region key
uintptr_t atomic_buffer_addr = 0; // Atomic buffer address
uint64_t atomic_buffer_len = 0; // Atomic buffer length

// #ifdef EFA
uint32_t num_rings;
uint32_t data_qp_num[kChannelPerProxy];
Expand Down Expand Up @@ -283,13 +288,14 @@ struct BarrierImm {
// [28:8]=SEQ (21 bits), [7:0]=SRC_RANK
static constexpr uint32_t kCtrlBit = 1u << 30;
static constexpr uint32_t kAckBit = 1u << 29;
static constexpr uint32_t kSeqMask = 0x1FFFFFu;
static inline bool IsAck(uint32_t imm) { return (imm & kAckBit) != 0u; }
static inline uint32_t Pack(bool ack, uint32_t seq, uint8_t src_rank) {
return kCtrlBit | (ack ? kAckBit : 0u) |
((seq & 0x1FFFFFu) << 8) // 21 bits for seq
((seq & kSeqMask) << 8) // 21 bits for seq
| uint32_t(src_rank);
}
static inline uint32_t Seq(uint32_t imm) { return (imm >> 8) & 0x1FFFFFu; }
static inline uint32_t Seq(uint32_t imm) { return (imm >> 8) & kSeqMask; }
static inline uint8_t Rank(uint32_t imm) { return imm & 0xFFu; }
explicit BarrierImm(uint32_t imm = 0) : value(imm) {}
bool GetIsAck() const { return IsAck(value); }
Expand Down Expand Up @@ -329,7 +335,8 @@ void remote_process_completions(
int my_rank, int num_nodes, bool use_normal_mode = false);
void create_per_thread_qp(ProxyCtx& S, void* gpu_buffer, size_t size,
RDMAConnectionInfo* local_info, int rank,
size_t num_rings, bool use_normal_mode);
size_t num_rings, bool use_normal_mode,
void* atomic_buffer_ptr = nullptr);
ibv_cq* create_per_thread_cq(ProxyCtx& S);
void remote_poll_completions(ProxyCtx& S, int idx, CopyRingBuffer& g_ring,
std::vector<ProxyCtx*>& ctx_by_tag,
Expand Down
3 changes: 3 additions & 0 deletions ep/include/uccl_ibgda.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ __device__ static __forceinline__ void nvshmemi_ibgda_quiet(
}
}
#endif
// All proxy threads of the GPU will post the quiet command.
break;
}

Expand Down Expand Up @@ -369,6 +370,8 @@ __forceinline__ __device__ void nvshmem_sync_with_same_gpu_idx(
}
}
#endif
// Only one thread of the GPU will post the barrier command.
// This is because as long as proxy thread reaches the barrier, we are sure that other GPUs managed by the thread have reached the barrier.
break;
}

Expand Down
66 changes: 48 additions & 18 deletions ep/src/internode.cu
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ __global__ void notify_dispatch(
uccl::nvshmem_sync_with_same_gpu_idx(d2h_channel_addrs,
num_d2h_channel_addrs, nvl_rank);
}
barrier_block<NUM_MAX_NVL_PEERS, true>(barrier_signal_ptrs, nvl_rank);
barrier_block<NUM_MAX_NVL_PEERS, true, 1>(barrier_signal_ptrs, nvl_rank);

// Send numbers of tokens per rank/expert to RDMA ranks
auto rdma_buffer_ptr_int = static_cast<int*>(rdma_buffer_ptr);
Expand Down Expand Up @@ -290,7 +290,7 @@ __global__ void notify_dispatch(
nvl_send_num_tokens_per_expert.buffer(nvl_rank)[i] =
nvl_reduced_num_tokens_per_expert[thread_id * num_nvl_experts + i];
}
barrier_block<NUM_MAX_NVL_PEERS>(barrier_signal_ptrs, nvl_rank);
barrier_block<NUM_MAX_NVL_PEERS, false, 2>(barrier_signal_ptrs, nvl_rank);

// Reduce the number of tokens per rank/expert
EP_DEVICE_ASSERT(num_nvl_experts <= num_threads);
Expand Down Expand Up @@ -323,7 +323,7 @@ __global__ void notify_dispatch(
if (thread_id == WARP_SIZE)
uccl::nvshmem_sync_with_same_gpu_idx(d2h_channel_addrs,
num_d2h_channel_addrs, nvl_rank);
barrier_block<NUM_MAX_NVL_PEERS>(barrier_signal_ptrs, nvl_rank);
barrier_block<NUM_MAX_NVL_PEERS, false, 3>(barrier_signal_ptrs, nvl_rank);
} else {
// Calculate meta data
int dst_rdma_rank = sm_id - 1;
Expand Down Expand Up @@ -1025,10 +1025,21 @@ __global__ void __launch_bounds__(
num_bytes_per_msg,
translate_dst_rdma_rank<kLowLatencyMode>(dst_rdma_rank, nvl_rank),
channel_id, // NOTE(MaoZiming): use channel_id for rb.
lane_id, 0, d2h_channel_addrs, num_d2h_channel_addrs, false, -1,
lane_id, 0, d2h_channel_addrs, num_d2h_channel_addrs, false,
// NOTE(MaoZiming): for AMD GPUs, we directly send a subsequent RDMA
// to update the tail. For other GPUs and EFA NICs, we use the
// CPU-emulated atomics, allow us to piggyback the atomic operation
// with the RDMA send.
#if (defined(__HIP_PLATFORM_AMD__) || defined(__HIPCC__)) && !defined(SOFTWARE_ORDERING)

-1,
#else
-1,
reinterpret_cast<uint64_t>(rdma_channel_tail.buffer(rdma_rank)) -
reinterpret_cast<uint64_t>(original_atomic_buffer_ptr),
num_tokens_to_issue);
num_tokens_to_issue
#endif
);
} else {
// Lighter fence for local RDMA rank
memory_fence();
Expand All @@ -1046,7 +1057,13 @@ __global__ void __launch_bounds__(
translate_dst_rdma_rank<kLowLatencyMode>(dst_rdma_rank, nvl_rank),
channel_id, // NOTE(MaoZiming): use channel_id for rb.
dst_rdma_rank == rdma_rank, d2h_channel_addrs,
num_d2h_channel_addrs, false, -1, true);
num_d2h_channel_addrs, false, -1,
#if (defined(__HIP_PLATFORM_AMD__) || defined(__HIPCC__)) && !defined(SOFTWARE_ORDERING)
false
#else
true
#endif
);
}
__syncwarp();
}
Expand Down Expand Up @@ -1152,7 +1169,7 @@ __global__ void __launch_bounds__(
if (__shfl_sync(WARP_MASK, num_tokens_to_recv_from_rdma,
src_rdma_rank) > 0) {
if (lane_id == src_rdma_rank)
#if defined(__HIP_PLATFORM_AMD__) || defined(__HIPCC__)
#if (defined(__HIP_PLATFORM_AMD__) || defined(__HIPCC__)) && !defined(SOFTWARE_ORDERING)
cached_rdma_channel_tail = static_cast<int>(__atomic_load_n(
rdma_channel_tail.buffer(src_rdma_rank), __ATOMIC_SEQ_CST));
#else
Expand All @@ -1178,7 +1195,7 @@ __global__ void __launch_bounds__(
trap();
}
}
#if defined(__HIP_PLATFORM_AMD__) || defined(__HIPCC__)
#if (defined(__HIP_PLATFORM_AMD__) || defined(__HIPCC__)) && defined(SOFTWARE_ORDERING)
memory_fence();
#endif
auto src_rdma_head =
Expand All @@ -1194,7 +1211,15 @@ __global__ void __launch_bounds__(
int seen_bits = ld_nc_global(reinterpret_cast<SourceMeta*>(
shifted + hidden_bytes + scale_bytes))
.is_token_in_nvl_rank_bits;
if (seen_bits == 0) trap();
if (seen_bits == 0) {
printf("DeepEP dispatch forwarder timeout (RDMA check), channel: %d, "
"RDMA: %d, nvl: %d, dst NVL: %d, src RDMA lane: %d, head: %d, "
"tail: %d, expected: %d\n",
channel_id, rdma_rank, nvl_rank, dst_nvl_rank, lane_id,
cached_rdma_channel_head, cached_rdma_channel_tail,
num_tokens_to_recv_from_rdma);
trap();
}
lane_id == src_rdma_rank ? (num_tokens_to_recv_from_rdma -= 1) : 0;
bool is_in_dst_nvl_rank = (seen_bits >> dst_nvl_rank) & 1;
if (lane_id == src_rdma_rank) {
Expand Down Expand Up @@ -1249,13 +1274,8 @@ __global__ void __launch_bounds__(
// Move tail index
__syncwarp();
if (lane_id == 0)
#if defined(__HIP_PLATFORM_AMD__) || defined(__HIPCC__)
__atomic_store_n(nvl_channel_tail.buffer(), cached_nvl_channel_tail,
__ATOMIC_RELEASE);
#else
st_release_sys_global(nvl_channel_tail.buffer(),
cached_nvl_channel_tail);
#endif
}
// Retired
__syncwarp();
Expand Down Expand Up @@ -1589,7 +1609,7 @@ __global__ void cached_notify(
num_d2h_channel_addrs, nvl_rank, 3);

// Barrier for NVL
barrier_block<NUM_MAX_NVL_PEERS, true>(barrier_signal_ptrs, nvl_rank);
barrier_block<NUM_MAX_NVL_PEERS, true, 4>(barrier_signal_ptrs, nvl_rank);

// Clean RDMA buffer
auto rdma_buffer_ptr_int = static_cast<int*>(rdma_buffer_ptr);
Expand Down Expand Up @@ -1626,7 +1646,7 @@ __global__ void cached_notify(
uccl::nvshmem_sync_with_same_gpu_idx(d2h_channel_addrs,
num_d2h_channel_addrs, nvl_rank);

barrier_block<NUM_MAX_NVL_PEERS>(barrier_signal_ptrs, nvl_rank);
barrier_block<NUM_MAX_NVL_PEERS, false, 5>(barrier_signal_ptrs, nvl_rank);
} else if (sm_id == 1) {
if (is_cached_dispatch) return;

Expand Down Expand Up @@ -2611,10 +2631,14 @@ __global__ void __launch_bounds__((kNumForwarders + 1) * WARP_SIZE, 1)
nvl_rank),
channel_id, // NOTE(MaoZiming): use channel_id for rb.
lane_id, 0, d2h_channel_addrs, num_d2h_channel_addrs, false, -1,
#if (defined(__HIP_PLATFORM_AMD__) || defined(__HIPCC__)) && !defined(SOFTWARE_ORDERING)
#else
reinterpret_cast<uint64_t>(
rdma_channel_tail.buffer(rdma_rank)) -
reinterpret_cast<uint64_t>(original_atomic_buffer_ptr),
num_chunked_tokens);
num_chunked_tokens
#endif
);
} else {
memory_fence();
}
Expand All @@ -2630,7 +2654,13 @@ __global__ void __launch_bounds__((kNumForwarders + 1) * WARP_SIZE, 1)
nvl_rank),
channel_id, // NOTE(MaoZiming): use warp_id for rb.
dst_rdma_rank == rdma_rank, d2h_channel_addrs,
num_d2h_channel_addrs, false, -1, true);
num_d2h_channel_addrs, false, -1,
#if (defined(__HIP_PLATFORM_AMD__) || defined(__HIPCC__)) && !defined(SOFTWARE_ORDERING)
false
#else
true
#endif
);
}
}
}
Expand Down
Loading