Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions ep/bench/vllm/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ Edit the provided scripts (`launch_vllm_head.sh` and `launch_vllm_worker.sh`) to
On the **first node** (primary node that handles API requests):

```bash
bash launch_vllm_head.sh 10.4.147.22 13345 deepseek-ai/DeepSeek-V3-0324 allgather_reducescatter 16 8 1 8
bash launch_vllm_head.sh 10.4.147.22 13345 deepseek-ai/DeepSeek-V3-0324 deepep_high_throughput 2 1 8 1
```

### Step 2: Start Node 1+ (Secondary)
Expand All @@ -108,15 +108,15 @@ On **each additional node** (secondary nodes in headless mode):

```bash
# Launch Node 1 (headless)
bash launch_vllm_worker.sh 10.4.147.22 13345 deepseek-ai/DeepSeek-V3-0324 allgather_reducescatter 16 8 1 8
bash launch_vllm_worker.sh 10.4.147.22 13345 deepseek-ai/DeepSeek-V3-0324 deepep_high_throughput 2 1 8 1
```

**Arguments:**
- `10.4.147.22` - IP address of **Node 0**, should be the IP of the `NCCL_SOCKET_IFNAME`
- `13345` - RPC port
- `deepseek-ai/DeepSeek-V3-0324` - Same model as Node 1
- `allgather_reducescatter` - EP communication backend
- `16` - Total DP size
- `8` - Local DP size on this node
- `1` - Local TP size on this node
- `8` - For node 0, number of API servers; for others, starting rank (= sum of previous nodes' local DP)
- `2` - Total DP size
- `1` - Local DP size on this node
- `8` - Local TP size on this node
- `1` - For node 0, number of API servers; for others, starting rank (= sum of previous nodes' local DP)
3 changes: 3 additions & 0 deletions ep/bench/vllm/launch_vllm_head.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
# 3. Install EP kernels: Follow vLLM's EP installation guide
# 4. For AWS EFA: Install AWS OFI-NCCL plugin

# Example:
# bash launch_vllm_head.sh 10.4.147.22 13345 deepseek-ai/DeepSeek-V3-0324 deepep_low_latency 2 1 8 1

set -e

echo "🚀 Launching vLLM Node 0 (Primary) with Expert Parallel..."
Expand Down
3 changes: 3 additions & 0 deletions ep/bench/vllm/launch_vllm_worker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
#
# IMPORTANT: All configuration must match Node 0!

# Example:
# bash launch_vllm_worker.sh 10.4.147.22 13345 deepseek-ai/DeepSeek-V3-0324 allgather_reducescatter 2 1 8 1

set -e

echo "🚀 Launching vLLM Secondary Node (Headless) with Expert Parallel..."
Expand Down
29 changes: 17 additions & 12 deletions ep/src/internode.cu
Original file line number Diff line number Diff line change
Expand Up @@ -274,11 +274,16 @@ __global__ void notify_dispatch(
i)[NUM_MAX_NVL_PEERS + num_rdma_experts];
recv_rdma_rank_prefix_sum[i] = sum;
}
if (num_worst_tokens == 0) {
// NOTE(MaoZiming): if I wrap this code with if (num_worst_tokens == 0),
// it will somehow cause deadlock on vllm with deepep_high_throughput mode.
// I suspect it is because some compiler reordering, but I don't know why.
// num_worst_tokens = 0, but somehow wrapping it with the conditional will cause deadlock.
// Removing the ``if" is logically redundant but harmless.
// if (num_worst_tokens == 0) {
while (ld_volatile_global(moe_recv_rdma_counter_mapped) != -1)
;
*moe_recv_rdma_counter_mapped = sum;
}
// }
}

// Send numbers of tokens per rank/expert to NVL ranks
Expand Down Expand Up @@ -306,24 +311,24 @@ __global__ void notify_dispatch(
sum += nvl_recv_num_tokens_per_rank.buffer(src_nvl_rank)[src_rdma_rank];
recv_gbl_rank_prefix_sum[i] = sum;
}
if (num_worst_tokens == 0) {
while (ld_volatile_global(moe_recv_counter_mapped) != -1)
// if (num_worst_tokens == 0) {
while (ld_volatile_global(moe_recv_counter_mapped) != -1)
;
*moe_recv_counter_mapped = sum;
}
// }
}
if (thread_id < num_nvl_experts) {
int sum = 0;
#pragma unroll
for (int i = 0; i < NUM_MAX_NVL_PEERS; ++i)
sum += nvl_recv_num_tokens_per_expert.buffer(i)[thread_id];
sum = (sum + expert_alignment - 1) / expert_alignment * expert_alignment;
if (num_worst_tokens == 0) {
while (ld_volatile_global(moe_recv_expert_counter_mapped + thread_id) !=
-1)
;
moe_recv_expert_counter_mapped[thread_id] = sum;
}
// if (num_worst_tokens == 0) {
while (ld_volatile_global(moe_recv_expert_counter_mapped + thread_id) !=
-1)
;
// }
moe_recv_expert_counter_mapped[thread_id] = sum;
}

// Finally barrier
Expand Down Expand Up @@ -1610,7 +1615,7 @@ __global__ void cached_notify(
// Barrier for RDMA
if (thread_id == WARP_SIZE)
uccl::nvshmem_sync_with_same_gpu_idx(d2h_channel_addrs,
num_d2h_channel_addrs, nvl_rank, 3);
num_d2h_channel_addrs, nvl_rank);

// Barrier for NVL
barrier_block<NUM_MAX_NVL_PEERS, true>(barrier_signal_ptrs, nvl_rank);
Expand Down
1 change: 1 addition & 0 deletions ep/src/proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -864,6 +864,7 @@ void Proxy::post_gpu_commands_mixed(
#ifdef USE_MSCCLPP_FIFO_BACKEND
assert(barrier_wrs.size() == 1 && ctx_.barrier_wr == -1);
#endif
assert(quiet_wrs.empty() && "quiet_wrs should be empty");
send_barrier(barrier_wrs[0]);
barrier_wrs.clear();
barrier_cmds.clear();
Expand Down
2 changes: 0 additions & 2 deletions ep/src/uccl_ep.cc
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,6 @@ class Buffer {
num_ranks),
num_nvl_bytes, low_latency_mode, d_handles, num_d2h_channel_addrs,
atomic_buffer_ptr);

// Synchronize total received tokens and tokens per expert
if (num_worst_tokens > 0) {
num_recv_tokens = num_worst_tokens;
Expand Down Expand Up @@ -880,7 +879,6 @@ class Buffer {
config.get_rdma_buffer_size_hint(hidden_int4 * sizeof(int4), num_ranks),
num_nvl_bytes, false, low_latency_mode, d_handles,
num_d2h_channel_addrs, atomic_buffer_ptr);

// Assign bias pointers
auto bias_opts =
std::vector<std::optional<torch::Tensor>>({bias_0, bias_1});
Expand Down