Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,6 @@ ep/figs

ep/deep_ep_wrapper/deep_ep.egg-info/
*.json
*result.jsonl
*result.jsonl

ep/deep_ep_wrapper/sglang_profiles*
14 changes: 13 additions & 1 deletion ep/bench/buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ def __init__(
low_latency_mode,
explicitly_destroy,
int(os.environ.get("LOCAL_WORLD_SIZE", -1)),
Buffer.disable_ll_layered(),
)
if num_rdma_bytes:
self.runtime.set_rdma_buffer_raw(rdma_buffer_ptr)
Expand Down Expand Up @@ -173,6 +174,13 @@ def reset_rdma_buffer(self):
def connect_atomic_buffer(self, proxy: "ep.UcclProxy"):
ep.connect_atomic_buffer(proxy, self.runtime)

@staticmethod
def disable_ll_layered() -> bool:
disable_ll_layered = False
if int(os.environ.get("DEEPEP_DISABLE_LL_DISPATCH_OPT", "0")) == 1:
disable_ll_layered = True
return disable_ll_layered

def destroy(self):
"""
Destroy the cpp runtime and release resources.
Expand Down Expand Up @@ -453,7 +461,11 @@ def get_low_latency_rdma_size_hint(
size: the RDMA buffer size recommended.
"""
return ep.get_low_latency_rdma_size_hint(
num_max_dispatch_tokens_per_rank, hidden, num_ranks, num_experts
Buffer.disable_ll_layered(),
num_max_dispatch_tokens_per_rank,
hidden,
num_ranks,
num_experts,
)

def get_comm_stream(self) -> torch.Stream:
Expand Down
35 changes: 30 additions & 5 deletions ep/include/ep_config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,12 @@ struct LowLatencyLayout {
count);
}

LowLatencyLayout(void* rdma_buffer, int num_max_dispatch_tokens_per_rank,
int hidden, int num_ranks, int num_experts,
LowLatencyLayout(bool disable_ll_layered, void* rdma_buffer,
int num_max_dispatch_tokens_per_rank, int hidden,
int num_ranks, int num_experts,
void* atomic_buffer_ptr = nullptr) {
int const num_scales = hidden / 128;
int const num_nodes = num_ranks / NUM_MAX_NVL_PEERS;

// Dispatch and combine layout:
// - 2 symmetric odd/even send buffer
Expand All @@ -188,9 +190,18 @@ struct LowLatencyLayout {
// NOTES: you should add a control `int4` for combine messages if you want
// to do data transformation
EP_HOST_ASSERT(num_scales * sizeof(float) <= static_cast<size_t>(hidden));
size_t per_meta_data_size = sizeof(int4);
size_t per_token_size_unaligned = std::max(hidden * sizeof(nv_bfloat16),
hidden + num_scales * sizeof(float)) +
sizeof(int); // Flag at end of data
// Align to sizeof(int4) for efficient vectorized copies
size_t per_token_size = align<size_t>(per_token_size_unaligned, sizeof(int4));
size_t num_bytes_per_dispatch_msg =
sizeof(int4) + std::max(hidden * sizeof(nv_bfloat16),
hidden + num_scales * sizeof(float));
if (!disable_ll_layered) {
num_bytes_per_dispatch_msg = per_meta_data_size + per_token_size;
}
size_t num_bytes_per_combine_msg = hidden * sizeof(nv_bfloat16);

// Send buffer
Expand All @@ -209,6 +220,12 @@ struct LowLatencyLayout {
size_t dispatch_recv_data_buffer_bytes = num_experts *
num_max_dispatch_tokens_per_rank *
num_bytes_per_dispatch_msg;
if (!disable_ll_layered) {
dispatch_recv_data_buffer_bytes =
num_experts * num_max_dispatch_tokens_per_rank * per_meta_data_size +
num_nodes * num_max_dispatch_tokens_per_rank * per_token_size;
}

size_t combine_recv_buffer_bytes = num_experts *
num_max_dispatch_tokens_per_rank *
num_bytes_per_combine_msg;
Expand All @@ -219,6 +236,12 @@ struct LowLatencyLayout {

// Symmetric signaling buffers
size_t dispatch_recv_count_buffer_bytes = num_experts * sizeof(int);
if (!disable_ll_layered) {
dispatch_recv_count_buffer_bytes += NUM_MAX_NVL_PEERS * num_nodes *
num_max_dispatch_tokens_per_rank *
sizeof(int) +
NUM_MAX_NVL_PEERS * sizeof(int);
}
size_t combine_recv_flag_buffer_bytes = dispatch_recv_count_buffer_bytes;
size_t signaling_buffer_bytes = std::max(dispatch_recv_count_buffer_bytes,
combine_recv_flag_buffer_bytes);
Expand Down Expand Up @@ -261,11 +284,13 @@ struct LowLatencyLayout {
}
};

size_t get_low_latency_rdma_size_hint(int num_max_dispatch_tokens_per_rank,
size_t get_low_latency_rdma_size_hint(bool dispatch_ll_dispatch_opt,
int num_max_dispatch_tokens_per_rank,
int hidden, int num_ranks,
int num_experts) {
auto num_bytes = LowLatencyLayout(nullptr, num_max_dispatch_tokens_per_rank,
hidden, num_ranks, num_experts, nullptr)
auto num_bytes = LowLatencyLayout(dispatch_ll_dispatch_opt, nullptr,
num_max_dispatch_tokens_per_rank, hidden,
num_ranks, num_experts, nullptr)
.total_bytes;
return ((num_bytes + NUM_BUFFER_ALIGNMENT_BYTES) /
NUM_BUFFER_ALIGNMENT_BYTES) *
Expand Down
18 changes: 10 additions & 8 deletions ep/include/internode_ll.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ namespace internode_ll {
void clean_low_latency_buffer(int* clean_0, int num_clean_int_0, int* clean_1,
int num_clean_int_1, cudaStream_t stream);
// Dummy host launcher declaration
void dispatch(void* packed_recv_x, void* packed_recv_x_scales,
int* packed_recv_src_info, int64_t* packed_recv_layout_range,
int* packed_recv_count, int* cumulative_local_expert_recv_stats,
void dispatch(bool dispatch_ll_dispatch_opt, void* packed_recv_x,
void* packed_recv_x_scales, int* packed_recv_src_info,
int64_t* packed_recv_layout_range, int* packed_recv_count,
int* cumulative_local_expert_recv_stats,
int64_t* dispatch_wait_recv_cost_stats, void* rdma_recv_x,
int* rdma_recv_count, void* rdma_x, void const* x,
int64_t const* topk_idx, int* next_clean, int* next_clean_second,
Expand All @@ -29,11 +30,12 @@ void dispatch(void* packed_recv_x, void* packed_recv_x_scales,
void* atomic_buffer_ptr = nullptr,
int* rdma_recv_count_internode = nullptr);

void combine(void* combined_x, void* rdma_recv_x, int* rdma_recv_flag,
void* rdma_send_x, void const* x, int64_t const* topk_idx,
float const* topk_weights, int const* src_info,
int64_t const* layout_range, int64_t* combine_wait_recv_cost_stats,
int* next_clean, int* next_clean_second, int num_next_clean_int,
void combine(bool dispatch_ll_dispatch_opt, void* combined_x, void* rdma_recv_x,
int* rdma_recv_flag, void* rdma_send_x, void const* x,
int64_t const* topk_idx, float const* topk_weights,
int const* src_info, int64_t const* layout_range,
int64_t* combine_wait_recv_cost_stats, int* next_clean,
int* next_clean_second, int num_next_clean_int,
int num_combined_tokens, int hidden,
int num_max_dispatch_tokens_per_rank, int num_topk,
int num_experts, int rank, int num_ranks, bool use_logfmt,
Expand Down
Loading
Loading