Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
280 changes: 145 additions & 135 deletions ep/bench/test_internode.py

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions ep/include/ep_configs.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,19 @@
// #define ENABLE_FAST_DEBUG
#ifndef ENABLE_FAST_DEBUG
#define NUM_CPU_TIMEOUT_SECS 100
#if defined(__HIP_PLATFORM_AMD__) || defined(__HIPCC__)
#define NUM_TIMEOUT_CYCLES 20000000000ull
#else
#define NUM_TIMEOUT_CYCLES 200000000000ull // 200G cycles ~= 100s
#endif
#else
#define NUM_CPU_TIMEOUT_SECS 10
#if defined(__HIP_PLATFORM_AMD__) || defined(__HIPCC__)
#define NUM_TIMEOUT_CYCLES 2000000000ull
#else
#define NUM_TIMEOUT_CYCLES 20000000000ull // 20G cycles ~= 10s
#endif
#endif

#define LOW_LATENCY_SEND_PHASE 1
#define LOW_LATENCY_RECV_PHASE 2
Expand Down
26 changes: 25 additions & 1 deletion ep/include/ep_utils.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,29 @@ __forceinline__ __device__ float fast_pow2(int x) {
return *reinterpret_cast<float*>(&bits_x);
}

#if defined(__HIP_PLATFORM_AMD__) || defined(__HIPCC__)
__forceinline__ __device__ void calculate_fp8_scales(float amax, float& scale,
float& scale_inv,
bool round_scale) {
if (!isfinite(amax) || amax <= 0.0f) {
scale = 1.0f;
scale_inv = 1.0f;
return;
}
float t = amax * kFinfoAmaxInvE4M3;
if (round_scale) {
int e;
frexpf(t, &e);
scale_inv = ldexpf(1.0f, e);
scale = ldexpf(1.0f, -e);
} else {
scale_inv = t;
scale = kFinfoAmaxE4M3 / amax;
}
if (!isfinite(scale) || scale <= 0.0f) scale = 1.0f;
if (!isfinite(scale_inv) || scale_inv <= 0.0f) scale_inv = 1.0f;
}
#else
__forceinline__ __device__ void calculate_fp8_scales(float amax, float& scale,
float& scale_inv,
bool round_scale) {
Expand All @@ -359,6 +382,7 @@ __forceinline__ __device__ void calculate_fp8_scales(float amax, float& scale,
scale = kFinfoAmaxE4M3 / amax;
}
}
#endif

// `ld.global.nc.L1::no_allocate` will be translated into
// `LDG.E.NA.[width].CONSTANT` in SASS
Expand Down Expand Up @@ -901,7 +925,7 @@ __device__ __forceinline__ void st_relaxed_sys_global(int const* ptr, int val) {
__device__ __forceinline__ int ld_acquire_cta(int const* ptr) {
int ret;
#if defined(__HIP_PLATFORM_AMD__) || defined(__HIPCC__)
HIP_ATOMIC_LOAD(ptr, __ATOMIC_ACQUIRE, __HIP_MEMORY_SCOPE_WORKGROUP);
ret = HIP_ATOMIC_LOAD(ptr, __ATOMIC_ACQUIRE, __HIP_MEMORY_SCOPE_WORKGROUP);
#else
asm volatile("ld.acquire.cta.s32 %0, [%1];" : "=r"(ret) : "l"(ptr));
#endif
Expand Down
3 changes: 3 additions & 0 deletions ep/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@
cxx_flags.append("-DDISABLE_AGGRESSIVE_ATOMIC")
nvcc_flags.append("-DDISABLE_AGGRESSIVE_ATOMIC")

cxx_flags.append("-DENABLE_FAST_DEBUG")
nvcc_flags.append("-DENABLE_FAST_DEBUG")

device_arch = os.getenv("TORCH_CUDA_ARCH_LIST", "gfx942")
os.environ["PYTORCH_ROCM_ARCH"] = device_arch

Expand Down
13 changes: 7 additions & 6 deletions ep/src/internode.cu
Original file line number Diff line number Diff line change
Expand Up @@ -819,7 +819,7 @@ __global__ void __launch_bounds__(
acquire_lock(rdma_send_channel_lock + lane_id);
auto latest_tail = rdma_send_channel_tail[lane_id];
auto offset = rdma_tail_idx - latest_tail;
while (offset >= WARP_SIZE) {
while (offset >= 32) {
release_lock(rdma_send_channel_lock + lane_id);
acquire_lock(rdma_send_channel_lock + lane_id);
latest_tail = rdma_send_channel_tail[lane_id];
Expand All @@ -830,8 +830,7 @@ __global__ void __launch_bounds__(
// Add the bit and move the ones if possible
auto window = rdma_send_channel_window[lane_id] | (1u << offset);
if (offset == 0) {
auto num_empty_slots =
(~window) == 0 ? WARP_SIZE : __ffs(~window) - 1;
auto num_empty_slots = (~window) == 0 ? 32 : __ffs(~window) - 1;
st_release_cta(rdma_send_channel_tail + lane_id,
latest_tail + num_empty_slots);
window >>= num_empty_slots;
Expand Down Expand Up @@ -1114,9 +1113,10 @@ __global__ void __launch_bounds__(

// Copy data
#if defined(__HIP_PLATFORM_AMD__) || defined(__HIPCC__)
UNROLLED_WARP_COPY(
5, lane_id, hidden_int4, reinterpret_cast<int4*>(dst_shifted),
reinterpret_cast<int4*>(shifted), ld_nc_global, st_na_global);
UNROLLED_WARP_COPY(5, lane_id, num_bytes_per_token / sizeof(int4),
reinterpret_cast<int4*>(dst_shifted),
reinterpret_cast<int4*>(shifted), ld_nc_global,
st_na_global);
#else
if (lane_id == 0) {
tma_load_1d(tma_buffer, shifted, tma_mbarrier, num_bytes_per_token,
Expand Down Expand Up @@ -1240,6 +1240,7 @@ __global__ void __launch_bounds__(
}
}
num_tokens_to_recv = warp_reduce_sum(end_offset - start_offset);

auto num_tokens_to_recv_original = num_tokens_to_recv;
// Save for combine usage
if (lane_id < kNumRDMARanks and not kCachedMode)
Expand Down
128 changes: 124 additions & 4 deletions ep/src/internode_ll.cu
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ constexpr int kNumMaxWarpGroups = 16;
constexpr int kNumMaxWarpGroups = 32;
#endif

#ifndef UCCL_DEBUG_NAN
#define UCCL_DEBUG_NAN 1
#endif
#define DBG_ROOT (blockIdx.x == 0 && threadIdx.x == 0)
#define DBG_WARP0 (blockIdx.x == 0 && (threadIdx.x / 32) == 0)

template <int kNumThreads>
__launch_bounds__(kNumThreads, 1) __global__
void clean_low_latency_buffer(int* clean_0, int num_clean_int_0,
Expand Down Expand Up @@ -84,15 +90,17 @@ __global__ __launch_bounds__(1024, 1) void dispatch(
size_t const hidden_bytes =
kHidden * (kUseFP8 ? sizeof(__nv_fp8_storage_t) : sizeof(nv_bfloat16));
size_t const hidden_int4 = hidden_bytes / sizeof(int4);
EP_DEVICE_ASSERT(hidden_int4 > 0);

// Message package: hidden data, FP8 scales, index at source
// NOTES: currently we have 3 reserved int fields for future use
using vec_t = typename std::conditional<kUseFP8, int2, int4>::type;
size_t const num_bytes_per_msg =
sizeof(int4) + (kUseFP8 ? (kHidden + num_scales * sizeof(float))
sizeof(int4) + (kUseFP8 ? (hidden_bytes + num_scales * sizeof(float))
: (kHidden * sizeof(nv_bfloat16)));
size_t const num_int4_per_msg = num_bytes_per_msg / sizeof(int4);
EP_DEVICE_ASSERT(num_bytes_per_msg % sizeof(int4) == 0);
EP_DEVICE_ASSERT(num_int4_per_msg > 0);

// Expert counts
__shared__ int shared_num_tokens_sent_per_expert[kNumMaxWarpGroups];
Expand Down Expand Up @@ -127,6 +135,17 @@ __global__ __launch_bounds__(1024, 1) void dispatch(
reinterpret_cast<uint8_t*>(rdma_x_src_idx) + sizeof(int4));
auto const rdma_x_scales = reinterpret_cast<float*>(
reinterpret_cast<uint8_t*>(rdma_x_vec) + hidden_bytes);
#if defined(__HIP_PLATFORM_AMD__) || defined(__HIPCC__)
if constexpr (kUseFP8) {
// One warp cooperatively initializes the per-token scales buffer.
if (warp_id == 0) {
for (int s = lane_id; s < num_scales; s += WARP_SIZE) {
rdma_x_scales[s] = 1.0f; // inverse scale default
}
}
__syncwarp();
}
#endif

// Overlap top-k index read and source token index writes
auto dst_expert_idx =
Expand Down Expand Up @@ -166,8 +185,14 @@ __global__ __launch_bounds__(1024, 1) void dispatch(
amax = warp_reduce_max<16>(amax);
calculate_fp8_scales(amax, scale, scale_inv, round_scale);
if (lane_id == 0 or lane_id == 16)
#endif
rdma_x_scales[i * kNumElemsPerRead / 128] = scale_inv;
#endif
if (blockIdx.x == 0 && threadIdx.x == 0) {
float s_chk = reinterpret_cast<float const*>(
rdma_x_scales)[i * kNumElemsPerRead / 128];
printf("[dispatch] token=%d lane=%d i=%d scale_inv(sample)=%e\n",
token_idx, lane_id, i, (double)s_chk);
}

// Cast into send buffer
vec_t int2_value;
Expand All @@ -179,8 +204,44 @@ __global__ __launch_bounds__(1024, 1) void dispatch(
fp32_values[j + 1] * scale};
fp8x2_values[j / 2] =
__nv_cvt_float2_to_fp8x2(fp32x2, __NV_SATFINITE, __NV_E4M3);
#if UCCL_DEBUG_NAN
if (blockIdx.x == 0 && lane_id == 0 && i == 0 && j == 0) {
printf(
"[dispatch][FP8-PACK] token=%d i=%d j=%d a=%e b=%e "
"scale=%e inv=%e amax=%e\n",
token_idx, i, j, (double)fp32x2.x, (double)fp32x2.y,
(double)scale, (double)scale_inv, (double)amax);
unsigned short raw16 =
*reinterpret_cast<unsigned short*>(&fp8x2_values[0]);
printf("[dispatch][FP8-PACK] token=%d raw16(first)=0x%04hx\n",
token_idx, raw16);
}
#endif
}
#if UCCL_DEBUG_NAN
if (blockIdx.x == 0 && lane_id == 0 && i == 0) {
unsigned long long raw8 =
*reinterpret_cast<unsigned long long*>(&rdma_x_vec[0]);
printf(
"[dispatch][SEND] token=%d raw8(fp8 first 8B)=0x%016llx "
"scale=%e inv=%e amax=%e\n",
token_idx, raw8, (double)scale, (double)scale_inv,
(double)amax);
}
#endif
rdma_x_vec[i] = int2_value;
#if UCCL_DEBUG_NAN
if (DBG_WARP0 && i == 0 && lane_id == 0) {
// dump first 8 FP8 bytes of the message payload we just wrote
unsigned long long raw8 =
*reinterpret_cast<unsigned long long*>(&rdma_x_vec[0]);
printf(
"[dispatch][SEND] token=%d raw8(fp8 bytes)=0x%016llx scale=%e "
"inv=%e amax=%e\n",
token_idx, raw8, (double)scale, (double)scale_inv,
(double)amax);
}
#endif
} else {
// Reinterpret-cast is for C++14 compatibility
rdma_x_vec[i] = *reinterpret_cast<vec_t*>(&int4_value);
Expand Down Expand Up @@ -356,7 +417,15 @@ LOW_LATENCY_DISPATCH_RECV:
local_expert_idx * num_ranks *
num_max_dispatch_tokens_per_rank *
num_aligned_scales;

#if UCCL_DEBUG_NAN
if (DBG_ROOT) {
printf(
"[dispatch][RECV-SETUP] expert=%d src_rank=%d local_expert=%d "
"hidden_bytes=%zu num_scales=%d aligned=%d\n",
responsible_expert_idx, src_rank, local_expert_idx,
(size_t)hidden_bytes, num_scales, num_aligned_scales);
}
#endif
// Shared between sub-warps in warp groups
__shared__ int shared_num_recv_tokens[kNumMaxWarpGroups],
shared_recv_token_begin_idx[kNumMaxWarpGroups];
Expand Down Expand Up @@ -425,6 +494,15 @@ LOW_LATENCY_DISPATCH_RECV:
recv_range[src_rank] =
pack2<int, int64_t>(num_recv_tokens, recv_token_begin_idx);
// Add stats for diagnosis
#if UCCL_DEBUG_NAN
if (DBG_ROOT) {
printf(
"[dispatch][RECV-COUNT] src_rank=%d num_recv_tokens=%d begin=%d "
"(ipc=%d, ib=%d)\n",
src_rank, num_recv_tokens, recv_token_begin_idx,
num_recv_tokens_ipc, num_recv_tokens_internode);
}
#endif
if (cumulative_local_expert_recv_stats != nullptr)
atomicAdd(cumulative_local_expert_recv_stats + local_expert_idx,
num_recv_tokens);
Expand Down Expand Up @@ -453,9 +531,36 @@ LOW_LATENCY_DISPATCH_RECV:
reinterpret_cast<uint8_t*>(src_src_idx) + sizeof(int4));
auto const dst_data =
recv_x_int4 + (recv_token_begin_idx + i) * hidden_int4;
#if UCCL_DEBUG_NAN
if (DBG_WARP0 && i == 0 && lane_id == 0) {
// peek first 16B (int4) of FP8 payload at source before copy
int4 peek_src = src_data[0];
unsigned long long lo =
*reinterpret_cast<unsigned long long*>(&peek_src);
unsigned long long hi =
*reinterpret_cast<unsigned long long*>(((char*)&peek_src) + 8);
printf(
"[dispatch][RECV-BEFORE] token_off=%d src_raw16=0x%016llx "
"0x%016llx\n",
recv_token_begin_idx, lo, hi);
}
#endif
UNROLLED_WARP_COPY(7, lane_id, hidden_int4, dst_data, src_data,
ld_nc_global, st_na_global);

#if UCCL_DEBUG_NAN
if (DBG_WARP0 && i == 0 && lane_id == 0) {
// peek first 16B (int4) of packed_recv_x after copy
int4 peek_dst = dst_data[0];
unsigned long long lo =
*reinterpret_cast<unsigned long long*>(&peek_dst);
unsigned long long hi =
*reinterpret_cast<unsigned long long*>(((char*)&peek_dst) + 8);
printf(
"[dispatch][RECV-AFTER] token_off=%d dst_raw16=0x%016llx "
"0x%016llx\n",
recv_token_begin_idx, lo, hi);
}
#endif
// Copy scales
if constexpr (kUseFP8) {
// Equivalent CuTe layout:
Expand All @@ -476,6 +581,13 @@ LOW_LATENCY_DISPATCH_RECV:
ld_nc_global(src_scales + lane_id));
recv_x_scales[token_idx * token_stride + pack_idx * pack_stride +
elem_idx] = scale;
#if UCCL_DEBUG_NAN
if (DBG_WARP0 && token_idx == recv_token_begin_idx && pack_idx == 0 &&
elem_idx == 0) {
float sc = (float)scale;
printf("[dispatch][RECV-SCALE] first_scale=%e\n", (double)sc);
}
#endif
}
if (lane_id + WARP_SIZE < num_scales) {
auto const pack_idx = (lane_id + WARP_SIZE) / num_elems_per_pack;
Expand All @@ -484,6 +596,14 @@ LOW_LATENCY_DISPATCH_RECV:
ld_nc_global(src_scales + lane_id + WARP_SIZE));
recv_x_scales[token_idx * token_stride + pack_idx * pack_stride +
elem_idx] = scale;
#if UCCL_DEBUG_NAN
if (DBG_WARP0 && token_idx == recv_token_begin_idx && pack_idx == 0 &&
elem_idx == 0) {
float sc2 = (float)scale;
printf("[dispatch][RECV-SCALE-2] first_scale_lane+W=%e\n",
(double)sc2);
}
#endif
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions ep/src/proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -840,6 +840,12 @@ void Proxy::post_gpu_commands_mixed(
0) {
return;
}

printf(
"[post_gpu_commands_mixed] thread %d: Posting %zu RDMA writes, %zu "
"atomics, %zu barriers, %zu quiets\n",
cfg_.thread_idx, rdma_wrs.size(), atomic_wrs.size(), barrier_cmds.size(),
quiet_cmds.size());
// Handle regular RDMA writes
if (!rdma_wrs.empty()) {
post_rdma_async_batched(ctx_, cfg_.gpu_buffer, rdma_wrs.size(), rdma_wrs,
Expand Down
Loading