Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,6 @@ ep/figs

ep/deep_ep_wrapper/deep_ep.egg-info/
*.json
*result.jsonl
*result.jsonl

ep/deep_ep_wrapper/sglang_profiles*
4 changes: 2 additions & 2 deletions ep/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ NVCCFLAGS += $(EFA_CFLAGS) $(GH_CFLAGS) $(NORMAL_CFLAGS)
LDFLAGS += $(EFA_LDFLAGS)
INCLUDES += $(EFA_CFLAGS) $(GH_CFLAGS) $(NORMAL_CFLAGS)

SRC_CPP := src/proxy.cpp src/rdma.cpp src/common.cpp src/peer_copy_worker.cpp src/uccl_proxy.cpp src/uccl_bench.cpp src/peer_copy_manager.cpp src/fifo.cpp
SRC_CU := src/bench_kernel.cu src/peer_copy.cu src/internode_ll.cu src/internode.cu src/layout.cu src/intranode.cu src/ep_runtime.cu
SRC_CPP := src/proxy.cpp src/rdma.cpp src/common.cpp src/uccl_proxy.cpp src/uccl_bench.cpp src/fifo.cpp
SRC_CU := src/bench_kernel.cu src/internode_ll.cu src/internode.cu src/layout.cu src/intranode.cu src/ep_runtime.cu

OBJ_CPP := $(SRC_CPP:.cpp=.o)
OBJ_CU := $(SRC_CU:.cu=.o)
Expand Down
4 changes: 2 additions & 2 deletions ep/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ combined_x, event, hook = buffer.low_latency_combine(

Initialization and tear down:
```python
proxies, workers = initialize_uccl(scratch, num_rdma_bytes, rank, num_ranks, group, args.num_experts)
destroy_uccl(proxies, workers)
proxies = initialize_uccl(scratch, num_rdma_bytes, rank, num_ranks, group, args.num_experts)
destroy_uccl(proxies)
```

## Benchmark
Expand Down
6 changes: 3 additions & 3 deletions ep/bench/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,10 @@ HEADERS += $(wildcard include/*.h include/*.cuh include/*.hpp)

ifeq ($(HAS_EFA),1)
SRC_CPP := ../src/proxy.cpp ../src/rdma.cpp ../src/common.cpp
SRC_CPP += ../src/peer_copy_worker.cpp ../src/uccl_proxy.cpp
SRC_CPP += ../src/uccl_bench.cpp ../src/peer_copy_manager.cpp ../src/fifo.cpp
SRC_CPP += ../src/uccl_proxy.cpp
SRC_CPP += ../src/uccl_bench.cpp ../src/fifo.cpp

SRC_CU := ../src/bench_kernel.cu ../src/peer_copy.cu
SRC_CU := ../src/bench_kernel.cu
SRC_CU += ../src/internode_ll.cu ../src/internode.cu ../src/layout.cu
SRC_CU += ../src/intranode.cu ../src/ep_runtime.cu

Expand Down
8 changes: 0 additions & 8 deletions ep/bench/benchmark_rdma_rb.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,15 +146,7 @@ def run_rank1_remote(
mode="remote",
peers_meta_list=peers_meta_list,
)
device_index = int(os.environ.get("LOCAL_RANK", "0"))
workers = ep.PeerCopyManager(src_device=device_index)
workers.start_for_proxies(proxies)
print("[rank 1] PeerCopyManager started.", flush=True)
time.sleep(5)
try:
workers.stop()
except Exception:
pass
try:
for p in proxies:
p.stop()
Expand Down
6 changes: 3 additions & 3 deletions ep/bench/buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,13 @@ def __init__(
self.scratch = ep.get_rdma_buffer(num_rdma_bytes, device_index)

rdma_buffer_ptr = self.scratch.data_ptr()
self.proxies, self.workers = initialize_uccl(
self.proxies = initialize_uccl(
rdma_buffer_ptr,
num_rdma_bytes,
group.rank(),
dist.get_world_size(group),
group,
use_normal_mode=not low_latency_mode,
use_throughput_mode=not low_latency_mode,
is_intranode=is_intranode,
)
check_nvlink_connections(group)
Expand Down Expand Up @@ -183,7 +183,7 @@ def destroy(self):

self.runtime.destroy()
self.runtime = None
destroy_uccl(self.proxies, self.workers)
destroy_uccl(self.proxies)

@staticmethod
def is_sm90_compiled():
Expand Down
4 changes: 2 additions & 2 deletions ep/bench/test_internode_simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def test_simple_internode(rank: int, num_ranks: int, group: dist.ProcessGroup):
scratch = torch.empty(
scratch_nbytes, dtype=torch.uint8, device=f"cuda:{device_index}"
)
proxies, workers = initialize_uccl(scratch, scratch_nbytes, rank, num_ranks, group)
proxies = initialize_uccl(scratch, scratch_nbytes, rank, num_ranks, group)

try:
buffer = Buffer(
Expand Down Expand Up @@ -146,7 +146,7 @@ def test_simple_internode(rank: int, num_ranks: int, group: dist.ProcessGroup):
dist.barrier()
print("[simple-test] ✓ Buffer destroyed", flush=True)

destroy_uccl(proxies, workers)
destroy_uccl(proxies)
dist.barrier()


Expand Down
48 changes: 4 additions & 44 deletions ep/bench/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,17 +171,6 @@ def get_cpu_proxies_meta(proxies, rank, scratch_ptr, scratch_bytes, num_ranks, g
torch.cuda.set_device(device_index)
dist.all_gather_object(all_meta, meta, group=group)
rank2meta = {m["rank"]: m for m in all_meta}

# Debug: print IP distribution
ip_counts = {}
for m in all_meta:
ip = m["ip"]
ip_counts[ip] = ip_counts.get(ip, 0) + 1
if rank == 0:
print(f"[DEBUG] IP distribution across {num_ranks} ranks:", flush=True)
for ip, count in ip_counts.items():
print(f"[DEBUG] {ip}: {count} ranks", flush=True)

return rank2meta


Expand Down Expand Up @@ -502,14 +491,8 @@ def initialize_uccl(
group,
num_experts=0,
is_intranode=False,
use_normal_mode=False,
use_throughput_mode=False,
):
try:
for shm_file in glob.glob("/dev/shm/uccl_barrier_*"):
os.remove(shm_file)
except Exception:
pass

# Try to get local_rank from environment or infer from current device
if "LOCAL_RANK" in os.environ:
local_rank = int(os.environ["LOCAL_RANK"])
Expand Down Expand Up @@ -558,7 +541,7 @@ def initialize_uccl(
num_experts=num_experts,
num_ranks=num_ranks,
num_nodes=num_nodes,
use_normal_mode=use_normal_mode,
use_throughput_mode=use_throughput_mode,
is_intranode=is_intranode,
)
proxies.append(proxy)
Expand All @@ -578,35 +561,17 @@ def initialize_uccl(
if not is_intranode:
for proxy in proxies:
proxy.start_dual()

workers = None
# if hasattr(ep, "PeerCopyManager"):
# try:
# workers = ep.PeerCopyManager(src_device=local_rank)
# workers.start_for_proxies(proxies)
# if rank == 0:
# print("✓ PeerCopyManager started", flush=True)
# except Exception as e:
# if rank == 0:
# print(f"PeerCopyManager unavailable: {e}", flush=True)

time.sleep(3)
return proxies, workers
return proxies


def destroy_uccl(proxies, workers):
def destroy_uccl(proxies):
# Use current device or fallback to LOCAL_RANK
if "LOCAL_RANK" in os.environ:
device_index = int(os.environ["LOCAL_RANK"])
else:
device_index = torch.cuda.current_device()

if workers is not None:
try:
workers.stop()
except Exception:
pass

try:
for p in proxies:
p.stop()
Expand All @@ -616,11 +581,6 @@ def destroy_uccl(proxies, workers):
ep.unregister_proxy(device_index)
except Exception:
pass
try:
for shm_file in glob.glob("/dev/shm/uccl_barrier_*"):
os.remove(shm_file)
except Exception:
pass


def per_token_cast_to_fp8(x: torch.Tensor):
Expand Down
1 change: 1 addition & 0 deletions ep/deep_ep_wrapper/scripts/sglang_nccl_44000_prefill.sh
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ export NCCL_SOCKET_IFNAME="^lo,docker"
export FI_PROVIDER=efa
export FI_EFA_USE_DEVICE_RDMA=1
export SGLANG_ENABLE_JIT_DEEPGEMM=1
export SGLANG_TORCH_PROFILER_DIR=/workspace/uccl/ep/deep_ep_wrapper/sglang_profiles_nccl

# Parameters
MODEL_PATH="deepseek-ai/DeepSeek-R1-0528"
Expand Down
1 change: 1 addition & 0 deletions ep/deep_ep_wrapper/scripts/sglang_uep_46000_prefill.sh
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ export NCCL_SOCKET_IFNAME="^lo,docker"
export FI_PROVIDER=efa
export FI_EFA_USE_DEVICE_RDMA=1
export SGLANG_ENABLE_JIT_DEEPGEMM=1
export SGLANG_TORCH_PROFILER_DIR=/workspace/uccl/ep/deep_ep_wrapper/sglang_profiles

# Parameters
MODEL_PATH="deepseek-ai/DeepSeek-R1-0528"
Expand Down
17 changes: 2 additions & 15 deletions ep/include/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,11 @@

#define MAX_IB_DEVS 32
// #define MEASURE_PER_OP_LATENCY
// #define MEASURE_PER_VERB_LATENCY

// Barrier type selection (can be overridden at compile time)
#ifndef USE_SENDER_BARRIER
#ifdef EFA
#define USE_RECEIVER_BARRIER
#endif
#endif

#ifdef EFA
#define EFA_QP_LOW_LATENCY_SERVICE_LEVEL 8

extern bool use_ll_sl;
#endif

#define USE_MSCCLPP_FIFO_BACKEND
// #define USE_SUBSET_BARRIER
#define kAtomicBufferSize 81960
#define kQueueSize 2048
#define kQueueMask (kQueueSize - 1)
Expand All @@ -41,9 +30,7 @@ extern bool use_ll_sl;
#define kIterations 40000
#define kNumProxyThs 4
#define kTestNumGpuThPerBlock 1
#define kObjectSize 7168 // 7 KB
// #define kObjectSize 10752 // 10.5 KB
// #define kObjectSize 14336 // 14 KB
#define kObjectSize 7168 // 7 KB
#define kMaxOutstandingSends 2048 // = max_send_wr, max_recv_wr, cq_depth / 2
#define kMaxOutstandingRecvs 2048
#define kSenderAckQueueDepth 2048
Expand Down
37 changes: 0 additions & 37 deletions ep/include/peer_copy.cuh

This file was deleted.

20 changes: 0 additions & 20 deletions ep/include/peer_copy_manager.hpp

This file was deleted.

33 changes: 0 additions & 33 deletions ep/include/peer_copy_worker.hpp

This file was deleted.

4 changes: 1 addition & 3 deletions ep/include/proxy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ class Proxy {
int num_experts = 0;
int num_ranks = 0;
int num_nodes = 0;
bool use_normal_mode =
false; // Runtime flag for normal mode (batching optimization)
bool use_throughput_mode = false;
bool is_intranode = false;
};

Expand All @@ -70,7 +69,6 @@ class Proxy {

void run_sender();
void run_remote();
void run_local();
void run_dual();
void pin_thread_to_cpu_wrapper();
void pin_thread_to_numa_wrapper();
Expand Down
Loading