diff --git a/docker/Dockerfile.rocm b/docker/Dockerfile.rocm index 1f06e1961..398671d63 100644 --- a/docker/Dockerfile.rocm +++ b/docker/Dockerfile.rocm @@ -43,6 +43,20 @@ RUN python${PY_VER} -m pip install --no-cache-dir build auditwheel pybind11 RUN ln -sf /usr/bin/python${PY_VER} /usr/local/bin/python3 && \ ln -sf /usr/bin/python${PY_VER}-config /usr/local/bin/python3-config +# ───── Install PyTorch with ROCm support ───── +# pytorch does not support ROCm 7.x now +ARG BASE_IMAGE +RUN set -ex; \ + if [ "$BASE_IMAGE" = "rocm/dev-ubuntu-22.04:6.4.3-complete" ]; then \ + echo "Installing pip and PyTorch with ROCm 6.4 support..."; \ + apt-get update && apt-get install -y python3-pip; \ + python3 -m pip install --upgrade pip; \ + python3 -m pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/rocm6.4; \ + python3 -c "import torch; print('CUDA available:', torch.cuda.is_available())"; \ + else \ + echo "BASE_IMAGE='$BASE_IMAGE', skipping PyTorch installation."; \ + fi + WORKDIR /io CMD ["bash"] \ No newline at end of file diff --git a/p2p/Makefile b/p2p/Makefile index 2f9c41bf6..f5e9a491b 100644 --- a/p2p/Makefile +++ b/p2p/Makefile @@ -36,7 +36,7 @@ PLUGIN_SO := libuccl_engine.so CAPI_SOURCE := uccl_engine.cc CAPI_HEADER := uccl_engine.h CAPI_OBJECT := $(CAPI_SOURCE:.cc=.o) -SOURCES := engine.cc pybind_engine.cc +SOURCES := engine.cc pybind_engine.cc tensor.cc CORE_OBJECT := engine.o OBJECTS := $(SOURCES:.cc=.o) diff --git a/p2p/Makefile.rocm b/p2p/Makefile.rocm index 86b5817a0..cfd493002 100644 --- a/p2p/Makefile.rocm +++ b/p2p/Makefile.rocm @@ -29,7 +29,7 @@ LDFLAGS = -L$(HIP_LIB) -lamdhip64 \ # Target and source files TARGET := p2p$(PYEXT) -SOURCES := engine.cc pybind_engine.cc +SOURCES := engine.cc pybind_engine.cc tensor.cc OBJECTS := $(SOURCES:.cc=.o) # Default target diff --git a/p2p/README.md b/p2p/README.md index a80947314..02fb7dce7 100644 --- a/p2p/README.md +++ b/p2p/README.md @@ -285,31 +285,25 @@ if success: ```python # Sender side -send_tensor = torch.ones(1024, dtype=torch.float32) -assert send_tensor.is_contiguous() # Ensure tensor is contiguous +from uccl.utils import create_tensor, get_tensor_id_by_tensor +send_tensor, mr_id = create_tensor((1024,) dtype=torch.float32, device="cpu") -# Register tensor for RDMA -success, mr_id = endpoint.reg(send_tensor.data_ptr(), send_tensor.numel() * 4) -assert success +# You can get mr_id later with get_tensor_id_by_tensor +mr_id = get_tensor_id_by_tensor(send_tensor) # Send the tensor success = endpoint.send(conn_id, mr_id, send_tensor.data_ptr(), send_tensor.numel() * 4) assert success # Receiver side -recv_tensor = torch.zeros(1024, dtype=torch.float32) -assert recv_tensor.is_contiguous() - -# Register receive buffer -success, mr_id = endpoint.reg(recv_tensor.data_ptr(), recv_tensor.numel() * 4) -assert success +recv_tensor, mr_id = create_tensor((1024,) dtype=torch.float32, device="cpu") # Receive the tensor success = endpoint.recv(conn_id, mr_id, recv_tensor.data_ptr(), recv_tensor.numel() * 4) assert success ``` -### NumPy Array Transfer +### NumPy Array Transfer [TODO] ```python import numpy as np @@ -338,21 +332,13 @@ success = endpoint.recv(conn_id, recv_mr_id, recv_ptr, recv_data.nbytes) ```python # Sender side - send multiple tensors at once -tensors = [ - torch.ones(512, dtype=torch.float32), - torch.ones(1024, dtype=torch.float32), - torch.ones(256, dtype=torch.float32) -] - -# Ensure all tensors are contiguous -for tensor in tensors: - assert tensor.is_contiguous() - -# Register all tensors +shapes = [(512,), (1024,), (256,)] +tensors = [] mr_ids = [] -for tensor in tensors: - success, mr_id = endpoint.reg(tensor.data_ptr(), tensor.numel() * 4) - assert success +for shape in shapes: + tensor, mr_id = create_tensor(shape, dtype=torch.float32, device="cuda:0") + assert tensor.is_contiguous() + tensors.append(tensor) mr_ids.append(mr_id) # Prepare data for vectorized send @@ -365,18 +351,13 @@ success = endpoint.sendv(conn_id, mr_ids, ptr_list, size_list, num_iovs) assert success # Receiver side - receive multiple tensors at once -recv_tensors = [ - torch.zeros(512, dtype=torch.float32), - torch.zeros(1024, dtype=torch.float32), - torch.zeros(256, dtype=torch.float32) -] - -# Register receive buffers +recv_tensors = [] recv_mr_ids = [] -for tensor in recv_tensors: - success, mr_id = endpoint.reg(tensor.data_ptr(), tensor.numel() * 4) - assert success - recv_mr_ids.append(mr_id) +for shape in shapes: + recv_tensor, recv_mr_id = create_tensor(shape, dtype=torch.float32, device="cuda:0") + assert recv_tensor.is_contiguous() + recv_tensors.append(recv_tensor) + recv_mr_ids.append(recv_mr_id) # Prepare data for vectorized receive recv_ptr_list = [tensor.data_ptr() for tensor in recv_tensors] @@ -394,6 +375,20 @@ assert success
Click me +### Tensor + +#### create_tensor +```python +create_tensor(shape, dtype, device) -> (tensor, tensor_id) +``` +Create an empty tensor and register its memory (GPU or pinned CPU). +Automatically handles CUDA device parsing and memory registration. + +**Parameters:** +- `shape` (Tuple[int, ...]): (1024, ) +- `dtype` (int): torch.dtype +- `device` (int): cpu or cuda:0 + ### Endpoint Class #### Constructor @@ -434,21 +429,6 @@ Accept an incoming connection (blocking). - `remote_gpu_idx` (int): GPU index of connecting client - `conn_id` (int): Connection ID for subsequent operations -#### Memory Registration - -```python -reg(ptr, size) -> (success, mr_id) -``` -Register a memory region for RDMA operations. - -**Parameters:** -- `ptr` (int): Memory pointer (use `tensor.data_ptr()` for PyTorch) -- `size` (int): Size in bytes - -**Returns:** -- `success` (bool): Whether registration succeeded -- `mr_id` (int): Memory region ID for transfer operations - #### Data Transfer ```python @@ -458,7 +438,7 @@ Send data to remote endpoint (blocking). **Parameters:** - `conn_id` (int): Connection ID from connect/accept -- `mr_id` (int): Memory region ID from register +- `mr_id` (int): Memory region ID (Tensor ID) from registed Tensor - `ptr` (int): Pointer to data to send - `size` (int): Number of bytes to send @@ -472,7 +452,7 @@ Receive data from remote endpoint (blocking). **Parameters:** - `conn_id` (int): Connection ID from connect/accept -- `mr_id` (int): Memory region ID from register +- `mr_id` (int): Memory region ID (Tensor ID) from registed Tensor - `ptr` (int): Pointer to buffer for received data - `size` (int): Number of bytes to receive @@ -486,7 +466,7 @@ Send multiple memory regions to remote endpoint in a single operation (blocking) **Parameters:** - `conn_id` (int): Connection ID from connect/accept -- `mr_id_list` (list[int]): List of memory region IDs from register +- `mr_id_list` (list[int]): List of memory region IDs (Tensor ID) from registed Tensor - `ptr_list` (list[int]): List of pointers to data to send - `size_list` (list[int]): List of sizes in bytes for each memory region - `num_iovs` (int): Number of I/O vectors (length of the lists) @@ -501,7 +481,7 @@ Receive multiple memory regions from remote endpoint in a single operation (bloc **Parameters:** - `conn_id` (int): Connection ID from connect/accept -- `mr_id_list` (list[int]): List of memory region IDs from register +- `mr_id_list` (list[int]): List of memory region IDs (Tensor ID) from registed Tensor - `ptr_list` (list[int]): List of pointers to buffers for received data - `size_list` (list[int]): List of sizes in bytes for each memory region - `num_iovs` (int): Number of I/O vectors (length of the lists) @@ -518,7 +498,7 @@ Send data to remote endpoint asynchronously (non-blocking). **Parameters:** - `conn_id` (int): Connection ID from connect/accept -- `mr_id` (int): Memory region ID from register +- `mr_id` (int): Memory region ID (Tensor ID) from registed Tensor - `ptr` (int): Pointer to data to send - `size` (int): Number of bytes to send @@ -533,7 +513,7 @@ Receive data from remote endpoint asynchronously (non-blocking). **Parameters:** - `conn_id` (int): Connection ID from connect/accept -- `mr_id` (int): Memory region ID from register +- `mr_id` (int): Memory region ID (Tensor ID) from registed Tensor - `ptr` (int): Pointer to buffer for received data - `size` (int): Exact number of bytes to receive @@ -562,7 +542,7 @@ Read data from remote endpoint using one-sided RDMA READ operation (blocking). **Parameters:** - `conn_id` (int): Connection ID from connect/accept -- `mr_id` (int): Memory region ID of remote data to read +- `mr_id` (int): Memory region ID (Tensor ID) of remote data to read - `dst` (int): Pointer to local destination buffer - `size` (int): Number of bytes to read - `slot_item` (FifoItem): Slot item for RDMA operation coordination (contains the remote address to read from) @@ -577,7 +557,7 @@ Read data from remote endpoint using one-sided RDMA READ operation asynchronousl **Parameters:** - `conn_id` (int): Connection ID from connect/accept -- `mr_id` (int): Memory region ID of remote data to read +- `mr_id` (int): Memory region ID (Tensor ID) of remote data to read - `dst` (int): Pointer to local destination buffer - `size` (int): Number of bytes to read - `slot_item` (FifoItem): Slot item for RDMA operation coordination (contains the remote address to read from) @@ -593,7 +573,7 @@ Read multiple memory regions from remote endpoint using one-sided RDMA READ oper **Parameters:** - `conn_id` (int): Connection ID from connect/accept -- `mr_id_list` (list[int]): List of memory region IDs of remote data to read +- `mr_id_list` (list[int]): List of memory region IDs (Tensor ID) of remote data to read - `dst_list` (list[int]): List of pointers to local destination buffers - `size_list` (list[int]): List of sizes in bytes for each memory region - `slot_item_list` (list[FifoItem]): List of slot items for RDMA operation coordination (contains the remote address to read from) @@ -609,7 +589,7 @@ Advertise memory region information to remote endpoint for one-sided RDMA operat **Parameters:** - `conn_id` (int): Connection ID from connect/accept -- `mr_id` (int): Memory region ID to advertise +- `mr_id` (int): Memory region ID (Tensor ID) to advertise - `addr` (int): Pointer to the memory region - `len` (int): Size of the memory region in bytes - `out_buf` (str): Output buffer to store advertisement metadata @@ -624,7 +604,7 @@ Advertise multiple memory regions to remote endpoint for one-sided RDMA operatio **Parameters:** - `conn_id` (int): Connection ID from connect/accept -- `mr_id_list` (list[int]): List of memory region IDs to advertise +- `mr_id_list` (list[int]): List of memory region IDs (Tensor ID) to advertise - `addr_list` (list[int]): List of pointers to memory regions - `len_list` (list[int]): List of sizes in bytes for each memory region - `out_buf_list` (list[str]): List of output buffers to store advertisement metadata @@ -640,7 +620,7 @@ Write data to remote endpoint using one-sided RDMA WRITE operation (blocking). **Parameters:** - `conn_id` (int): Connection ID from connect/accept -- `mr_id` (int): Memory region ID of remote destination +- `mr_id` (int): Memory region ID (Tensor ID) of remote destination - `src` (int): Pointer to local buffer - `size` (int): Number of bytes to write - `slot_item` (FifoItem): Slot item for RDMA operation coordination (contains the remote address to write to) @@ -655,7 +635,7 @@ Write data to remote endpoint using one-sided RDMA WRITE operation asynchronousl **Parameters:** - `conn_id` (int): Connection ID from connect/accept -- `mr_id` (int): Memory region ID of remote destination +- `mr_id` (int): Memory region ID (Tensor ID) of remote destination - `src` (int): Pointer to local buffer - `size` (int): Number of bytes to write - `slot_item` (FifoItem): Slot item for RDMA operation coordination (contains the remote address to write to) @@ -671,7 +651,7 @@ Write multiple memory regions to remote endpoint using one-sided RDMA WRITE oper **Parameters:** - `conn_id` (int): Connection ID from connect/accept -- `mr_id_list` (list[int]): List of memory region IDs of remote destinations +- `mr_id_list` (list[int]): List of memory region IDs (Tensor ID) of remote destinations - `src_list` (list[int]): List of pointers to local buffers - `size_list` (list[int]): List of sizes in bytes for each memory region - `slot_item_list` (list[FifoItem]): List of slot items for RDMA operation coordination (contains the remote address to write to) diff --git a/p2p/benchmarks/benchmark_uccl.py b/p2p/benchmarks/benchmark_uccl.py index 61554e655..9a76b7f15 100644 --- a/p2p/benchmarks/benchmark_uccl.py +++ b/p2p/benchmarks/benchmark_uccl.py @@ -13,6 +13,7 @@ try: from uccl import p2p + from uccl.utils import get_tensor_id_by_tensor, create_tensor except ImportError as exc: sys.stderr.write("Failed to import p2p\n") raise @@ -26,16 +27,16 @@ def _make_buffer(size_bytes: int, device: str, gpu_idx: int): if device == "gpu": dtype = torch.float32 if size_bytes >= 4 else torch.uint8 n_elems = size_bytes // dtype.itemsize - buf = torch.ones(n_elems, dtype=dtype).cuda() - assert buf.device.type == "cuda" - assert buf.is_contiguous() - ptr = buf.data_ptr() + tensor, t_id = create_tensor((n_elems,), dtype=dtype, device=f"cuda:{gpu_idx}") + assert tensor.device.type == "cuda" + assert tensor.is_contiguous() + ptr = tensor.data_ptr() else: # cpu dtype = np.float32 if size_bytes >= 4 else np.uint8 n_elems = size_bytes // dtype.itemsize - buf = np.ones(n_elems, dtype=dtype) - ptr = buf.ctypes.data - return buf, ptr + tensor, t_id = create_tensor((n_elems,), dtype=dtype, device=f"cpu") + ptr = tensor.ctypes.data + return tensor, ptr, t_id def _pretty_size(num_bytes: int) -> str: @@ -60,9 +61,9 @@ def _run_server(args, ep, remote_metadata): data_ptr_v = [] size_v = [] for _ in range(args.num_kvblocks): - buf, ptr = _make_buffer(size_per_block, args.device, args.local_gpu_idx) - ok, mr_id = ep.reg(ptr, size_per_block) - assert ok, "[Server] register failed" + buf, ptr, mr_id = _make_buffer( + size_per_block, args.device, args.local_gpu_idx + ) buf_v.append(buf) mr_id_v.append(mr_id) data_ptr_v.append(ptr) @@ -160,9 +161,9 @@ def _run_client(args, ep, remote_metadata): data_ptr_v = [] size_v = [] for _ in range(args.num_kvblocks): - buf, ptr = _make_buffer(size_per_block, args.device, args.local_gpu_idx) - ok, mr_id = ep.reg(ptr, size_per_block) - assert ok, "[Client] register failed" + buf, ptr, mr_id = _make_buffer( + size_per_block, args.device, args.local_gpu_idx + ) buf_v.append(buf) mr_id_v.append(mr_id) data_ptr_v.append(ptr) @@ -257,14 +258,10 @@ def _run_server_dual(args, ep, remote_metadata): print(f"[Server] Connected to {ip}:{port} (GPU {r_gpu}) conn_id={conn_id2}") for size in args.sizes: - buf, ptr = _make_buffer(size, args.device, args.local_gpu_idx) - ok, mr_id = ep.reg(ptr, size) - assert ok, "[Server] register failed" + buf, ptr, mr_id = _make_buffer(size, args.device, args.local_gpu_idx) # ep.recv(conn_id, mr_id, ptr, size) - buf2, ptr2 = _make_buffer(size, args.device, args.local_gpu_idx) - ok, mr_id2 = ep.reg(ptr2, size) - assert ok, "[Server] register failed" + buf2, ptr2, mr_id2 = _make_buffer(size, args.device, args.local_gpu_idx) # ep.send(conn_id, mr_id2, ptr2, size) start = time.perf_counter() @@ -314,14 +311,10 @@ def _run_client_dual(args, ep, remote_metadata): print(f"[Client] Accept from {r_ip} (GPU {r_gpu2}) conn_id={conn_id2}") for size in args.sizes: - buf, ptr = _make_buffer(size, args.device, args.local_gpu_idx) - ok, mr_id = ep.reg(ptr, size) - assert ok, "[Client] register failed" + buf, ptr, mr_id = _make_buffer(size, args.device, args.local_gpu_idx) # ep.send(conn_id, mr_id, ptr, size) - buf2, ptr2 = _make_buffer(size, args.device, args.local_gpu_idx) - ok, mr_id2 = ep.reg(ptr2, size) - assert ok, "[Client] register failed" + buf2, ptr2, mr_id2 = _make_buffer(size, args.device, args.local_gpu_idx) # ep.recv(conn_id, mr_id2, ptr2, size) start = time.perf_counter() @@ -366,7 +359,7 @@ def _run_server_ipc(args, ep): for size in args.sizes: # Allocate receive buffer - no memory registration needed for IPC - buf, ptr = _make_buffer(size, args.device, args.local_gpu_idx) + buf, ptr, ipc_id = _make_buffer(size, args.device, args.local_gpu_idx) # Warm-up transfer if args.async_api: @@ -413,7 +406,7 @@ def _run_client_ipc(args, ep, remote_gpu_idx): for size in args.sizes: # Allocate send buffer - no memory registration needed for IPC - buf, ptr = _make_buffer(size, args.device, args.local_gpu_idx) + buf, ptr, ipc_id = _make_buffer(size, args.device, args.local_gpu_idx) # Warm-up transfer if args.async_api: diff --git a/p2p/benchmarks/benchmark_uccl_collective.py b/p2p/benchmarks/benchmark_uccl_collective.py index de2c242ba..5105095a3 100644 --- a/p2p/benchmarks/benchmark_uccl_collective.py +++ b/p2p/benchmarks/benchmark_uccl_collective.py @@ -18,12 +18,15 @@ import os from uccl import collective +from uccl.utils import create_tensor def _make_buffer(size_bytes: int): """Allocate a contiguous GPU tensor of *size_bytes* and return it.""" n_elems = size_bytes // 4 # float32 - tensor = torch.ones(n_elems, dtype=torch.float32).cuda() + tensor, _ = create_tensor( + (n_elems,), dtype=torch.float32, device=f"cuda:{torch.cuda.current_device()}" + ) assert tensor.is_contiguous() assert tensor.device.type == "cuda" return tensor @@ -49,9 +52,6 @@ def _run_server(args): for size in args.sizes: tensor = _make_buffer(size) - # Register tensor for efficient memory access - collective.register_tensor(tensor) - # Warm-up receive collective.recv(tensor, src=peer) @@ -85,9 +85,6 @@ def _run_client(args): tensor = _make_buffer(size) tensor.fill_(size) - # Register tensor for efficient memory access - collective.register_tensor(tensor) - # Warm-up send collective.send(tensor, dst=peer) @@ -113,9 +110,6 @@ def _run_async_server(args): for size in args.sizes: tensor = _make_buffer(size) - # Register tensor for efficient memory access - collective.register_tensor(tensor) - # Warm-up req = collective.irecv(tensor, src=peer) collective.wait(req) @@ -143,9 +137,6 @@ def _run_async_client(args): for size in args.sizes: tensor = _make_buffer(size) - # Register tensor for efficient memory access - collective.register_tensor(tensor) - # Warm-up req = collective.isend(tensor, dst=peer) collective.wait(req) @@ -181,10 +172,6 @@ def _run_dual_benchmark(args): send_tensor = _make_buffer(size) recv_tensor = _make_buffer(size) - # Register tensors for efficient memory access - collective.register_tensor(send_tensor) - collective.register_tensor(recv_tensor) - # Warm-up: simultaneous send and receive send_req = collective.isend(send_tensor, dst=peer) recv_req = collective.irecv(recv_tensor, src=peer) @@ -242,10 +229,6 @@ def _run_ring_benchmark(args): # Fill send tensor with rank-specific data for verification send_tensor.fill_(size) - # Register tensors for efficient memory access - collective.register_tensor(send_tensor) - collective.register_tensor(recv_tensor) - # Warm-up send_req = collective.isend(send_tensor, dst=dst_rank) recv_req = collective.irecv(recv_tensor, src=src_rank) diff --git a/p2p/benchmarks/benchmark_uccl_read.py b/p2p/benchmarks/benchmark_uccl_read.py index 6ba2231b8..a406f9cd0 100644 --- a/p2p/benchmarks/benchmark_uccl_read.py +++ b/p2p/benchmarks/benchmark_uccl_read.py @@ -11,23 +11,22 @@ try: from uccl import p2p + from uccl.utils import get_tensor_id_by_tensor, create_tensor except ImportError: sys.stderr.write("Failed to import p2p\n") raise # parse_metadata is now provided by the C++ layer via p2p.Endpoint.parse_metadata() - - def _make_buffer(n_bytes: int, device: str, gpu: int): n = n_bytes // 4 if device == "gpu": - buf = torch.ones(n, dtype=torch.float32, device=f"cuda:{gpu}") - ptr = buf.data_ptr() + tensor, t_id = create_tensor((n,), dtype=torch.float32, device=f"cuda:{gpu}") + ptr = tensor.data_ptr() else: - buf = torch.ones(n, dtype=torch.float32, pin_memory=True) - ptr = buf.data_ptr() - return buf, ptr + tensor, t_id = create_tensor((n,), dtype=torch.float32, device="cpu") + ptr = tensor.data_ptr() + return tensor, ptr, t_id def _pretty(num: int): @@ -51,9 +50,9 @@ def _run_server_read(args, ep, remote_metadata): mr_id_v = [] size_v = [] for _ in range(args.num_iovs): - buf, ptr = _make_buffer(size_per_block, args.device, args.local_gpu_idx) - ok, mr_id = ep.reg(ptr, size_per_block) - assert ok + buf, ptr, mr_id = _make_buffer( + size_per_block, args.device, args.local_gpu_idx + ) buf_v.append(buf) ptr_v.append(ptr) mr_id_v.append(mr_id) @@ -83,9 +82,9 @@ def _run_client_recv(args, ep, remote_metadata): size_v = [] fifo_blob_v = [] for _ in range(args.num_iovs): - buf, ptr = _make_buffer(size_per_block, args.device, args.local_gpu_idx) - ok, mr_id = ep.reg(ptr, size_per_block) - assert ok + buf, ptr, mr_id = _make_buffer( + size_per_block, args.device, args.local_gpu_idx + ) buf_v.append(buf) ptr_v.append(ptr) mr_id_v.append(mr_id) diff --git a/p2p/benchmarks/benchmark_uccl_transfer.py b/p2p/benchmarks/benchmark_uccl_transfer.py index 94bd3f63a..0ccdb9481 100644 --- a/p2p/benchmarks/benchmark_uccl_transfer.py +++ b/p2p/benchmarks/benchmark_uccl_transfer.py @@ -7,6 +7,7 @@ import os from uccl import p2p from uccl.transfer import TransferManager +from uccl.utils import create_tensor # UCCL P2P read requires RC mode, as RDMA UC does not support one-sided read. os.environ["UCCL_RCMODE"] = "1" @@ -17,12 +18,12 @@ def _make_buffer(n_bytes: int, device: str, gpu: int): n = n_bytes // 4 if device == "gpu": - buf = torch.ones(n, dtype=torch.float32, device=f"cuda:{gpu}") - ptr = buf.data_ptr() + tensor, _ = create_tensor((n,), dtype=torch.float32, device=f"cuda:{gpu}") + ptr = tensor.data_ptr() else: - buf = torch.ones(n, dtype=torch.float32, pin_memory=True) - ptr = buf.data_ptr() - return buf, ptr + tensor, _ = create_tensor((n,), dtype=torch.float32, device="cpu") + ptr = tensor.data_ptr() + return tensor, ptr def _pretty(num: int): diff --git a/p2p/benchmarks/benchmark_uccl_write.py b/p2p/benchmarks/benchmark_uccl_write.py index 83448d3bd..b1f96dae3 100644 --- a/p2p/benchmarks/benchmark_uccl_write.py +++ b/p2p/benchmarks/benchmark_uccl_write.py @@ -11,23 +11,22 @@ try: from uccl import p2p + from uccl.utils import get_tensor_id_by_tensor, create_tensor except ImportError: sys.stderr.write("Failed to import p2p\n") raise # parse_metadata is now provided by the C++ layer via p2p.Endpoint.parse_metadata() - - def _make_buffer(n_bytes: int, device: str, gpu: int): n = n_bytes // 4 if device == "gpu": - buf = torch.ones(n, dtype=torch.float32, device=f"cuda:{gpu}") - ptr = buf.data_ptr() + tensor, t_id = create_tensor((n,), dtype=torch.float32, device=f"cuda:{gpu}") + ptr = tensor.data_ptr() else: - buf = torch.ones(n, dtype=torch.float32, pin_memory=True) - ptr = buf.data_ptr() - return buf, ptr + tensor, t_id = create_tensor((n,), dtype=torch.float32, device="cpu") + ptr = tensor.data_ptr() + return tensor, ptr, t_id def _pretty(num: int): @@ -51,9 +50,9 @@ def _run_server_adv(args, ep, remote_metadata): mr_id_v = [] size_v = [] for _ in range(args.num_iovs): - buf, ptr = _make_buffer(size_per_block, args.device, args.local_gpu_idx) - ok, mr_id = ep.reg(ptr, size_per_block) - assert ok + buf, ptr, mr_id = _make_buffer( + size_per_block, args.device, args.local_gpu_idx + ) buf_v.append(buf) ptr_v.append(ptr) mr_id_v.append(mr_id) @@ -83,9 +82,9 @@ def _run_client_write(args, ep, remote_metadata): size_v = [] fifo_blob_v = [] for _ in range(args.num_iovs): - buf, ptr = _make_buffer(size_per_block, args.device, args.local_gpu_idx) - ok, mr_id = ep.reg(ptr, size_per_block) - assert ok + buf, ptr, mr_id = _make_buffer( + size_per_block, args.device, args.local_gpu_idx + ) buf_v.append(buf) ptr_v.append(ptr) mr_id_v.append(mr_id) diff --git a/p2p/collective.py b/p2p/collective.py index 14452013a..8f129bef0 100644 --- a/p2p/collective.py +++ b/p2p/collective.py @@ -79,7 +79,6 @@ def __init__(self, num_cpus: int = 4, local_gpu_idx: Optional[int] = None): self.local_connections = ( None # array indexed by rank - True if local, False if remote ) - self.memory_regions: Dict[int, int] = {} # ptr -> mr_id self.initialized = False # check and setup fd limit and somaxconn for UDS @@ -319,55 +318,6 @@ def _get_buffer_info(self, tensor: torch.Tensor) -> Tuple[int, int]: size = tensor.numel() * tensor.element_size() return ptr, size - def _register_memory(self, ptr: int, size: int) -> int: - """Register memory and cache the memory region ID.""" - if ptr in self.memory_regions: - return self.memory_regions[ptr] - - ok, mr_id = self.ep.reg(ptr, size) - if not ok: - raise RuntimeError("Failed to register memory") - self.memory_regions[ptr] = mr_id - return mr_id - - def register_tensor(self, tensor: torch.Tensor): - """ - Register a tensor for efficient memory access. - - Note: Registration is only required for tensors used with remote (RDMA) connections. - Local IPC connections do not require memory registration. - - Args: - tensor: Tensor to register - """ - if not self.initialized: - raise RuntimeError("CollectiveContext not initialized. Call init() first.") - - ptr, size = self._get_buffer_info(tensor) - self._register_memory(ptr, size) - - def _deregister_memory(self, ptr: int): - """Deregister memory and remove the cached memory region ID.""" - if ptr not in self.memory_regions: - return - - mr_id = self.memory_regions[ptr] - self.ep.dereg(mr_id) - del self.memory_regions[ptr] - - def deregister_tensor(self, tensor: torch.Tensor): - """ - Deregister a tensor that was previously registered. - - Args: - tensor: Tensor to deregister - """ - if not self.initialized: - raise RuntimeError("CollectiveContext not initialized. Call init() first.") - - ptr, _ = self._get_buffer_info(tensor) - self._deregister_memory(ptr) - def send(self, tensor: torch.Tensor, dst: int): """ Send tensor to destination rank (synchronous). @@ -395,12 +345,7 @@ def send(self, tensor: torch.Tensor, dst: int): raise RuntimeError(f"Failed to initiate IPC send to rank {dst}") else: # Use RDMA for remote connection (requires memory registration) - if ptr not in self.memory_regions: - raise RuntimeError( - f"Tensor memory not registered for remote communication with rank {dst}. " - f"Call register_tensor() first for tensors used with remote ranks." - ) - mr_id = self.memory_regions[ptr] + mr_id = utils.get_tensor_id_by_tensor(tensor=tensor) ok = self.ep.send(conn_id, mr_id, ptr, size) if not ok: raise RuntimeError(f"Failed to initiate RDMA send to rank {dst}") @@ -432,12 +377,7 @@ def recv(self, tensor: torch.Tensor, src: int): raise RuntimeError(f"Failed to initiate IPC recv from rank {src}") else: # Use RDMA for remote connection (requires memory registration) - if ptr not in self.memory_regions: - raise RuntimeError( - f"Tensor memory not registered for remote communication with rank {src}. " - f"Call register_tensor() first for tensors used with remote ranks." - ) - mr_id = self.memory_regions[ptr] + mr_id = utils.get_tensor_id_by_tensor(tensor=tensor) ok = self.ep.recv(conn_id, mr_id, ptr, size) if not ok: raise RuntimeError(f"Failed to initiate RDMA recv from rank {src}") @@ -473,12 +413,7 @@ def isend(self, tensor: torch.Tensor, dst: int) -> int: return transfer_id else: # Use RDMA async for remote connection (requires memory registration) - if ptr not in self.memory_regions: - raise RuntimeError( - f"Tensor memory not registered for remote communication with rank {dst}. " - f"Call register_tensor() first for tensors used with remote ranks." - ) - mr_id = self.memory_regions[ptr] + mr_id = utils.get_tensor_id_by_tensor(tensor=tensor) ok, transfer_id = self.ep.send_async(conn_id, mr_id, ptr, size) if not ok: raise RuntimeError(f"Failed to initiate async RDMA send to rank {dst}") @@ -515,12 +450,7 @@ def irecv(self, tensor: torch.Tensor, src: int) -> int: return transfer_id else: # Use RDMA async for remote connection (requires memory registration) - if ptr not in self.memory_regions: - raise RuntimeError( - f"Tensor memory not registered for remote communication with rank {src}. " - f"Call register_tensor() first for tensors used with remote ranks." - ) - mr_id = self.memory_regions[ptr] + mr_id = utils.get_tensor_id_by_tensor(tensor=tensor) ok, transfer_id = self.ep.recv_async(conn_id, mr_id, ptr, size) if not ok: raise RuntimeError( @@ -573,7 +503,6 @@ def finalize(self): self.send_connections = None self.recv_connections = None self.local_connections = None - self.memory_regions.clear() self.ep = None self.initialized = False @@ -641,19 +570,9 @@ def test(transfer_id: int) -> bool: return get_collective().test(transfer_id) -def register_tensor(tensor: torch.Tensor): - """Register tensor using the default collective context.""" - get_collective().register_tensor(tensor) - - def finalize_collective(): """Finalize the default collective context.""" global _default_context if _default_context is not None: _default_context.finalize() _default_context = None - - -def deregister_tensor(tensor: torch.Tensor): - """Deregister tensor using the default collective context.""" - get_collective().deregister_tensor(tensor) diff --git a/p2p/engine.cc b/p2p/engine.cc index 83eeead22..291adb81c 100644 --- a/p2p/engine.cc +++ b/p2p/engine.cc @@ -17,7 +17,6 @@ #include #include -int const kMaxNumGPUs = 8; // Assume the local and remote GPUs have the same GPU-NIC mapping. uint8_t gpu_to_dev[kMaxNumGPUs] = {0}; std::once_flag glog_init_once; @@ -126,13 +125,30 @@ Endpoint::~Endpoint() { delete conn; } } + + std::vector mr_to_dereg; { - std::shared_lock lock(mr_mu_); - for (auto& [mr_id, mr] : mr_id_to_mr_) { - delete mr; + std::shared_lock lock(mr_mapping_mu_); + for (auto const& [id, mr_ptr] : mr_mapping_) { + if (mr_ptr && mr_ptr->mhandle_) { + mr_to_dereg.push_back(mr_ptr->mhandle_); + } } } + for (auto* mhandle : mr_to_dereg) { + dereg_mr(mhandle); + } + + { + std::unique_lock lock(mr_mapping_mu_); + mr_mapping_.clear(); + } + { + std::unique_lock lock(ipc_handle_mapping_mu_); + ipc_handle_mapping_.clear(); + } + if (!streams_.empty()) { GPU_RT_CHECK(gpuSetDevice(local_gpu_idx_)); for (auto s : streams_) @@ -285,72 +301,6 @@ bool Endpoint::accept(std::string& ip_addr, int& remote_gpu_idx, return true; } -bool Endpoint::reg(void const* data, size_t size, uint64_t& mr_id) { - mr_id = next_mr_id_.fetch_add(1); - - uccl::Mhandle* mhandle; - ep_->uccl_regmr(gpu_to_dev[local_gpu_idx_], const_cast(data), size, 0, - &mhandle); - if (mhandle->mr == nullptr) { - std::cerr << "[Endpoint::reg] Failed to register memory region, " - << "mhandle->mr is null\n"; - std::abort(); - } - { - std::unique_lock lock(mr_mu_); - mr_id_to_mr_[mr_id] = new MR{mr_id, mhandle}; - } - - return true; -} - -bool Endpoint::regv(std::vector const& data_v, - std::vector const& size_v, - std::vector& mr_id_v) { - if (data_v.size() != size_v.size()) - throw std::invalid_argument( - "[Endpoint::regv] data_v/size_v length mismatch"); - - size_t const n = data_v.size(); - mr_id_v.resize(n); - - { - std::unique_lock lock(mr_mu_); - mr_id_to_mr_.reserve(mr_id_to_mr_.size() + n); - } - - for (size_t i = 0; i < n; ++i) { - uint64_t id = next_mr_id_.fetch_add(1); - uccl::Mhandle* mhandle = nullptr; - - ep_->uccl_regmr(gpu_to_dev[local_gpu_idx_], const_cast(data_v[i]), - size_v[i], 0, &mhandle); - - if (mhandle == nullptr || mhandle->mr == nullptr) { - std::cerr << "[Endpoint::regv] registration failed at i=" << i << '\n'; - return false; - } - - { - std::unique_lock lock(mr_mu_); - mr_id_to_mr_[id] = new MR{id, mhandle}; - } - mr_id_v[i] = id; - } - return true; -} - -bool Endpoint::dereg(uint64_t mr_id) { - { - std::unique_lock lock(mr_mu_); - MR* mr = mr_id_to_mr_[mr_id]; - ep_->uccl_deregmr(mr->mhandle_); - delete mr; - mr_id_to_mr_.erase(mr_id); - } - return true; -} - bool Endpoint::send(uint64_t conn_id, uint64_t mr_id, void const* data, size_t size) { DCHECK(size <= 0xffffffff) << "size must be less than 4GB"; @@ -362,10 +312,7 @@ bool Endpoint::send(uint64_t conn_id, uint64_t mr_id, void const* data, } uccl::Mhandle* mhandle; - { - std::shared_lock lock(mr_mu_); - mhandle = mr_id_to_mr_[mr_id]->mhandle_; - } + mhandle = get_mr_handle_by_mem_id(mr_id); uccl::ucclRequest ureq[kMaxInflightChunks] = {}; bool done[kMaxInflightChunks] = {false}; @@ -419,10 +366,8 @@ bool Endpoint::recv(uint64_t conn_id, uint64_t mr_id, void* data, size_t size) { } uccl::Mhandle* mhandle; - { - std::shared_lock lock(mr_mu_); - mhandle = mr_id_to_mr_[mr_id]->mhandle_; - } + mhandle = get_mr_handle_by_mem_id(mr_id); + int size_int = static_cast(size); uccl::ucclRequest ureq[kMaxInflightChunks] = {}; @@ -540,6 +485,7 @@ bool Endpoint::sendv(uint64_t conn_id, std::vector mr_id_v, size_send_vec.reserve(estimated_ureq_max); mhandle_send_vec.reserve(estimated_ureq_max); + uccl::Mhandle* mhandle; for (int i = 0; i < num_iovs; i++) { void* cur_data = (void*)data_v[i]; size_t cur_size_expected = size_v[i]; @@ -550,15 +496,12 @@ bool Endpoint::sendv(uint64_t conn_id, std::vector mr_id_v, std::min(cur_size_expected - cur_size_post_send, kChunkSize); data_send_vec.push_back(cur_data); size_send_vec.push_back(chunk_size); - { - std::shared_lock lock(mr_mu_); - auto it = mr_id_to_mr_.find(mr_id_v[i]); - if (it == mr_id_to_mr_.end()) { - std::cerr << "[sendv] Error: Invalid mr_id " << mr_id_v[i] - << std::endl; - return false; - } - mhandle_send_vec.push_back(it->second->mhandle_); + mhandle = get_mr_handle_by_mem_id(mr_id_v[i]); + if (mhandle == nullptr) { + std::cerr << "[sendv] Error: Invalid mr_id " << mr_id_v[i] << std::endl; + return false; + } else { + mhandle_send_vec.push_back(mhandle); } cur_data += chunk_size; cur_size_post_send += chunk_size; @@ -636,6 +579,7 @@ bool Endpoint::recvv(uint64_t conn_id, std::vector mr_id_v, mhandle_recv_vec.reserve(estimated_ureq_max); mhandle_recv_ptr_vec.reserve(estimated_ureq_max); + uccl::Mhandle* mhandle; for (int i = 0; i < num_iovs; i++) { void* cur_data = data_v[i]; size_t cur_size_expected = size_v[i]; @@ -647,15 +591,12 @@ bool Endpoint::recvv(uint64_t conn_id, std::vector mr_id_v, data_recv_vec.push_back(cur_data); data_recv_ptr_vec.push_back(&data_recv_vec.back()); size_recv_vec.push_back(chunk_size); - { - std::shared_lock lock(mr_mu_); - auto it = mr_id_to_mr_.find(mr_id_v[i]); - if (it == mr_id_to_mr_.end()) { - std::cerr << "[recvv] Error: Invalid mr_id " << mr_id_v[i] - << std::endl; - return false; - } - mhandle_recv_vec.push_back(it->second->mhandle_); + mhandle = get_mr_handle_by_mem_id(mr_id_v[i]); + if (mhandle == nullptr) { + std::cerr << "[sendv] Error: Invalid mr_id " << mr_id_v[i] << std::endl; + return false; + } else { + mhandle_recv_vec.push_back(mhandle); } mhandle_recv_ptr_vec.push_back(&mhandle_recv_vec.back()); cur_data += chunk_size; @@ -709,7 +650,7 @@ bool Endpoint::read(uint64_t conn_id, uint64_t mr_id, void* dst, size_t size, DCHECK(size <= 0xffffffff) << "size must be < 4 GB"; auto* conn = conn_id_to_conn_[conn_id]; - auto* mhandle = mr_id_to_mr_[mr_id]->mhandle_; + auto* mhandle = get_mr_handle_by_mem_id(mr_id); uccl::ucclRequest ureq[kMaxInflightChunks] = {}; uccl::FifoItem curr_slot_item[kMaxInflightChunks] = {}; @@ -866,7 +807,7 @@ bool Endpoint::readv(uint64_t conn_id, std::vector mr_id_v, size_t cur_size_expected = size_v[i]; size_t cur_size_post_read = 0; uccl::FifoItem base_slot_item = slot_item_v[i]; - auto mhandle = mr_id_to_mr_[mr_id_v[i]]->mhandle_; + auto mhandle = get_mr_handle_by_mem_id(mr_id_v[i]); while (cur_size_post_read < cur_size_expected) { size_t chunk_size = @@ -974,7 +915,7 @@ bool Endpoint::writev(uint64_t conn_id, std::vector mr_id_v, size_t cur_size_expected = size_v[i]; size_t cur_size_post_write = 0; uccl::FifoItem base_slot_item = slot_item_v[i]; - auto mhandle = mr_id_to_mr_[mr_id_v[i]]->mhandle_; + auto mhandle = get_mr_handle_by_mem_id(mr_id_v[i]); while (cur_size_post_write < cur_size_expected) { size_t chunk_size = @@ -1043,7 +984,7 @@ bool Endpoint::write(uint64_t conn_id, uint64_t mr_id, void* src, size_t size, DCHECK(size <= 0xffffffff) << "size must be < 4 GB"; auto* conn = conn_id_to_conn_[conn_id]; - auto* mhandle = mr_id_to_mr_[mr_id]->mhandle_; + auto* mhandle = get_mr_handle_by_mem_id(mr_id); uccl::ucclRequest ureq[kMaxInflightChunks] = {}; uccl::FifoItem curr_slot_item[kMaxInflightChunks] = {}; @@ -1103,7 +1044,8 @@ bool Endpoint::write(uint64_t conn_id, uint64_t mr_id, void* src, size_t size, bool Endpoint::advertise(uint64_t conn_id, uint64_t mr_id, void* addr, size_t len, char* out_buf) { auto* conn = conn_id_to_conn_[conn_id]; - auto mhandle = mr_id_to_mr_[mr_id]->mhandle_; + auto mhandle = get_mr_handle_by_mem_id(mr_id); + uccl::ucclRequest req_data; if (ep_->prepare_fifo_metadata( static_cast(conn->uccl_conn_id_.context), &mhandle, @@ -1117,7 +1059,7 @@ bool Endpoint::advertisev(uint64_t conn_id, std::vector mr_id_v, std::vector out_buf_v, size_t num_iovs) { auto* conn = conn_id_to_conn_[conn_id]; for (size_t i = 0; i < num_iovs; ++i) { - auto mhandle = mr_id_to_mr_[mr_id_v[i]]->mhandle_; + auto mhandle = get_mr_handle_by_mem_id(mr_id_v[i]); if (ep_->prepare_fifo_metadata( static_cast(conn->uccl_conn_id_.context), &mhandle, addr_v[i], len_v[i], out_buf_v[i]) == -1) { diff --git a/p2p/engine.h b/p2p/engine.h index dfc913177..7ea852e92 100644 --- a/p2p/engine.h +++ b/p2p/engine.h @@ -1,5 +1,6 @@ #pragma once +#include "tensor.h" #include "transport.h" #include "util/gpu_rt.h" #include "util/jring.h" @@ -20,10 +21,8 @@ namespace py = pybind11; extern thread_local bool inside_python; -struct MR { - uint64_t mr_id_; - uccl::Mhandle* mhandle_; -}; +int const kMaxNumGPUs = 8; +extern uint8_t gpu_to_dev[kMaxNumGPUs]; struct Conn { uint64_t conn_id_; @@ -107,13 +106,6 @@ class Endpoint { */ bool accept(std::string& ip_addr, int& remote_gpu_idx, uint64_t& conn_id); - /*Register the data with a specific interface. */ - bool reg(void const* data, size_t size, uint64_t& mr_id); - - bool regv(std::vector const& data_v, - std::vector const& size_v, std::vector& mr_id_v); - bool dereg(uint64_t mr_id); - /*Send data to the remote server. Blocking. */ bool send(uint64_t conn_id, uint64_t mr_id, void const* data, size_t size); @@ -273,14 +265,14 @@ class Endpoint { uccl::RDMAEndpoint* ep_; std::atomic next_conn_id_ = 0; - std::atomic next_mr_id_ = 0; + // std::atomic next_mr_id_ = 0; std::atomic next_transfer_id_ = 0; // Accessed by both app thread and proxy thread. mutable std::shared_mutex conn_mu_; std::unordered_map conn_id_to_conn_; - mutable std::shared_mutex mr_mu_; - std::unordered_map mr_id_to_mr_; + // mutable std::shared_mutex mr_mu_; + // std::unordered_map mr_id_to_mr_; // Single-threaded. std::unordered_map rank2conn_; diff --git a/p2p/pybind_engine.cc b/p2p/pybind_engine.cc index 9b673f659..81d044911 100644 --- a/p2p/pybind_engine.cc +++ b/p2p/pybind_engine.cc @@ -1,4 +1,5 @@ #include "engine.h" +#include "tensor.h" #include #include #include @@ -15,6 +16,20 @@ PYBIND11_MODULE(p2p, m) { m.def("get_oob_ip", &get_oob_ip, "Get the OOB IP address"); + m.def( + "reg_mem", + [](uint64_t addr, size_t size, bool is_device, int gpu_id) { + uint64_t mem_id; + reg_mem(reinterpret_cast(addr), size, mem_id, is_device, gpu_id); + return mem_id; + }, + "Reg the memory with RDMA capabilities", py::arg("addr"), py::arg("size"), + py::arg("is_device"), py::arg("gpu_id")); + + m.def( + "dereg_mem", [](uint64_t mem_id) { dereg_mem(mem_id); }, + "Dereg the memory associated RDMA resources", py::arg("mem_id")); + // Endpoint class binding py::class_(m, "Endpoint") .def(py::init(), "Create a new Engine instance", @@ -69,55 +84,6 @@ PYBIND11_MODULE(p2p, m) { conn_id); }, "Accept an incoming connection") - .def( - "reg", - [](Endpoint& self, uint64_t ptr, size_t size) { - uint64_t mr_id; - bool success; - { - py::gil_scoped_release release; - InsidePythonGuard guard; - success = - self.reg(reinterpret_cast(ptr), size, mr_id); - } - return py::make_tuple(success, mr_id); - }, - "Register a data buffer", py::arg("ptr"), py::arg("size")) - .def( - "regv", - [](Endpoint& self, std::vector const& ptrs, - std::vector const& sizes) { - if (ptrs.size() != sizes.size()) - throw std::runtime_error("ptrs and sizes must match"); - - std::vector data_v; - data_v.reserve(ptrs.size()); - for (auto p : ptrs) - data_v.push_back(reinterpret_cast(p)); - - std::vector mr_ids; - bool ok; - { - py::gil_scoped_release release; - InsidePythonGuard guard; - ok = self.regv(data_v, sizes, mr_ids); - } - return py::make_tuple(ok, py::cast(mr_ids)); - }, - py::arg("ptrs"), py::arg("sizes"), - "Batch-register multiple memory regions and return [ok, mr_id_list]") - .def( - "dereg", - [](Endpoint& self, uint64_t mr_id) { - bool ok; - { - py::gil_scoped_release release; - InsidePythonGuard guard; - ok = self.dereg(mr_id); - } - return ok; - }, - "Deregister a memory region", py::arg("mr_id")) .def( "send", [](Endpoint& self, uint64_t conn_id, uint64_t mr_id, uint64_t ptr, diff --git a/p2p/tensor.cc b/p2p/tensor.cc new file mode 100644 index 000000000..0f8b09dcd --- /dev/null +++ b/p2p/tensor.cc @@ -0,0 +1,182 @@ +#include "tensor.h" +#include +#include +#include + +std::shared_mutex mr_mapping_mu_; +std::unordered_map> mr_mapping_; +std::shared_mutex ipc_handle_mapping_mu_; +std::unordered_map> ipc_handle_mapping_; +std::atomic next_mem_id_{0}; + +int reg_dma_mr(uccl::FactoryDevice* dev, void* addr, size_t len, int offset, + int fd, struct uccl::Mhandle** mhandle) { + bool ib_relaxed_ordering_enabled_ = uccl::ncclIbRelaxedOrderingCapable(); + unsigned int flags = + IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ; + if (ib_relaxed_ordering_enabled_) flags |= IBV_ACCESS_RELAXED_ORDERING; + + *mhandle = new uccl::Mhandle(); + (*mhandle)->mr = + ibv_reg_dmabuf_mr(dev->pd, offset, len, (uint64_t)addr, fd, flags); + if (!(*mhandle)->mr) { + delete *mhandle; + *mhandle = nullptr; + return -1; + } + return 0; +} + +int reg_mr(uccl::FactoryDevice* dev, void* addr, size_t len, + struct uccl::Mhandle** mhandle) { + bool ib_relaxed_ordering_enabled_ = uccl::ncclIbRelaxedOrderingCapable(); + unsigned int flags = + IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ; + if (ib_relaxed_ordering_enabled_) flags |= IBV_ACCESS_RELAXED_ORDERING; + + *mhandle = new uccl::Mhandle(); + if (ib_relaxed_ordering_enabled_) { + // try iova2 variant if available + (*mhandle)->mr = + ibv_reg_mr_iova2(dev->pd, addr, len, (uint64_t)addr, flags); + } else { + (*mhandle)->mr = ibv_reg_mr(dev->pd, addr, len, flags); + } + + if (!(*mhandle)->mr) { + std::cerr << "ibv_reg_mr failed (" << strerror(errno) << "), len=" << len + << " addr=" << addr << "\n"; + delete *mhandle; + *mhandle = nullptr; + return -1; + } + return 0; +} + +void dereg_mr(struct uccl::Mhandle* mhandle) { + if (!mhandle) return; + if (mhandle->mr) { + ibv_dereg_mr(mhandle->mr); + mhandle->mr = nullptr; + } + delete mhandle; +} + +int get_ipc_handle(void* addr, struct IPCMemHandle* ipchandle) { + if (!ipchandle) return -1; + GPU_RT_CHECK( + gpuIpcGetMemHandle(&ipchandle->handle, reinterpret_cast(addr))); + return 0; +} + +int open_ipc_handle(void* addr, struct IPCMemHandle* ipchandle) { + if (!ipchandle) return -1; + GPU_RT_CHECK(gpuIpcOpenMemHandle(&addr, ipchandle->handle, + gpuIpcMemLazyEnablePeerAccess)); + return 0; +} + +void reg_mem(void* addr, size_t size, uint64_t& mem_id, bool is_device, + int gpu_id) { + if (gpu_id < 0 || gpu_id >= kMaxNumGPUs) { + throw std::invalid_argument("[reg_mem] Invalid gpu_id: " + + std::to_string(gpu_id)); + } + + if (gpu_to_dev[gpu_id] == 0) { + throw std::runtime_error( + "You must initialize UCCL collective context or Endpoint first"); + } + + if (is_device) { + GPU_RT_CHECK(gpuSetDevice(gpu_id)); + } + + uccl::FactoryDevice* factory_dev = + uccl::RDMAFactory::get_factory_dev(gpu_to_dev[gpu_id]); + + mem_id = next_mem_id_.fetch_add(1); + // MR + std::unique_ptr mr = std::make_unique(); + int ret = reg_mr(factory_dev, addr, size, &mr->mhandle_); + if (ret != 0) { + throw std::runtime_error("MR registration failed"); + } + mr->mr_id_ = mem_id; + { + std::unique_lock lock(mr_mapping_mu_); + mr_mapping_[mr->mr_id_] = std::move(mr); + } + + // IPC + if (is_device) { + auto addr_aligned = + reinterpret_cast(addr) & ~(kIpcAlignment - 1); + auto addr_offset = reinterpret_cast(addr) - addr_aligned; + // std::cout << "[reg_mem] Aligned pointer=" << addr_aligned << std::endl; + + std::unique_ptr ipc = std::make_unique(); + ret = get_ipc_handle(reinterpret_cast(addr_aligned), ipc.get()); + if (ret != 0) { + // Rollback MR + std::unique_lock lock(mr_mapping_mu_); + auto it = mr_mapping_.find(mem_id); + if (it != mr_mapping_.end()) { + if (it->second->mhandle_) { + dereg_mr(it->second->mhandle_); + } + mr_mapping_.erase(it); + } + throw std::runtime_error( + "[reg_mem] IPC handle creation failed for device memory"); + } + ipc->size = size; + ipc->offset = addr_offset; + ipc->id = mem_id; + { + std::unique_lock lock(ipc_handle_mapping_mu_); + ipc_handle_mapping_[ipc->id] = std::move(ipc); + } + } +} + +void dereg_mem(uint64_t mem_id) { + { + std::unique_lock lock(mr_mapping_mu_); + auto it = mr_mapping_.find(mem_id); + if (it != mr_mapping_.end()) { + MR* mr = it->second.get(); + if (mr->mhandle_) { + dereg_mr(mr->mhandle_); + mr->mhandle_ = nullptr; + } + mr_mapping_.erase(it); + } + } + { + std::unique_lock lock(ipc_handle_mapping_mu_); + auto it = ipc_handle_mapping_.find(mem_id); + if (it != ipc_handle_mapping_.end()) { + ipc_handle_mapping_.erase(it); + } + } +} + +uccl::Mhandle* get_mr_handle_by_mem_id(uint64_t mem_id) { + std::shared_lock lock(mr_mapping_mu_); + auto it = mr_mapping_.find(mem_id); + if (it != mr_mapping_.end()) { + return it->second->mhandle_; + } + return nullptr; +} + +gpuIpcMemHandle_t get_ipc_mem_handle_by_mem_id(uint64_t mem_id) { + std::shared_lock lock(ipc_handle_mapping_mu_); + auto it = ipc_handle_mapping_.find(mem_id); + if (it != ipc_handle_mapping_.end()) { + return it->second->handle; + } + gpuIpcMemHandle_t invalid = {}; + return invalid; +} \ No newline at end of file diff --git a/p2p/tensor.h b/p2p/tensor.h new file mode 100644 index 000000000..652b38d60 --- /dev/null +++ b/p2p/tensor.h @@ -0,0 +1,54 @@ +#pragma once +#include "engine.h" +#include "rdma_io.h" +#include "transport.h" +#include "util/gpu_rt.h" +#include +#include +#include +#include + +static constexpr size_t kIpcAlignment = 1ul << 20; + +struct MR { + uint64_t mr_id_; + uccl::Mhandle* mhandle_; +}; + +struct IPCMemHandle { + uint64_t id; + gpuIpcMemHandle_t handle; + size_t size; + size_t offset; +}; + +// internal +extern std::shared_mutex mr_mapping_mu_; +extern std::unordered_map> mr_mapping_; + +extern std::shared_mutex ipc_handle_mapping_mu_; +extern std::unordered_map> + ipc_handle_mapping_; + +extern std::atomic next_mem_id_; + +int reg_dma_mr(uccl::FactoryDevice* dev, void* addr, size_t len, int offset, + int fd, struct uccl::Mhandle** mhandle); + +int reg_mr(uccl::FactoryDevice* dev, void* addr, size_t len, + struct uccl::Mhandle** mhandle); + +void dereg_mr(struct uccl::Mhandle* mhandle); + +int get_ipc_handle(void* addr, struct IPCMemHandle* ipchandle); + +int open_ipc_handle(void* addr, struct IPCMemHandle* ipchandle); + +// API: for pybind +void reg_mem(void* addr, size_t size, uint64_t& mem_id, bool is_device = true, + int gpu_id = 0); +void dereg_mem(uint64_t mem_id); + +// for engine Endpoint +uccl::Mhandle* get_mr_handle_by_mem_id(uint64_t mem_id); +gpuIpcMemHandle_t get_ipc_mem_handle_by_mem_id(uint64_t mem_id); \ No newline at end of file diff --git a/p2p/tests/test_engine_metadata.py b/p2p/tests/test_engine_metadata.py index b13049740..e800a0c16 100644 --- a/p2p/tests/test_engine_metadata.py +++ b/p2p/tests/test_engine_metadata.py @@ -14,6 +14,7 @@ try: from uccl import p2p + from uccl.utils import get_tensor_id_by_tensor, create_tensor print("✓ Successfully imported p2p") except ImportError as e: @@ -46,11 +47,10 @@ def server_process(q): f"Server accepted connection from {remote_ip_addr}, GPU {remote_gpu_idx}, conn_id={conn_id}" ) - tensor = torch.zeros(1024, dtype=torch.float32) + tensor, t_id = create_tensor((1024,), dtype=torch.float32, device=f"cpu") + mr_id = get_tensor_id_by_tensor(tensor=tensor) assert tensor.is_contiguous() - - success, mr_id = engine.reg(tensor.data_ptr(), tensor.numel() * 4) - assert success + assert mr_id == t_id success = engine.recv( conn_id, mr_id, tensor.data_ptr(), size=tensor.numel() * 8 @@ -74,11 +74,10 @@ def client_process(q): assert success print(f"Client connected successfully: conn_id={conn_id}") - tensor = torch.ones(1024, dtype=torch.float32) + tensor, t_id = create_tensor((1024,), dtype=torch.float32, device=f"cpu") + mr_id = get_tensor_id_by_tensor(tensor=tensor) assert tensor.is_contiguous() - - success, mr_id = engine.reg(tensor.data_ptr(), tensor.numel() * 4) - assert success + assert mr_id == t_id success = engine.send(conn_id, mr_id, tensor.data_ptr(), tensor.numel() * 4) assert success diff --git a/p2p/tests/test_engine_read.py b/p2p/tests/test_engine_read.py index 472d892b2..582146f3f 100644 --- a/p2p/tests/test_engine_read.py +++ b/p2p/tests/test_engine_read.py @@ -15,6 +15,7 @@ try: from uccl import p2p + from uccl.utils import get_tensor_id_by_tensor, create_tensor except ImportError as e: sys.stderr.write(f"Failed to import p2p: {e}\n") raise @@ -49,9 +50,10 @@ def server_proc(ep_meta_q, fifo_meta_q): assert ok, "connect failed" print(f"[Server] connected (conn_id={conn_id})") - tensor = torch.ones(1024, dtype=torch.float32, device="cuda:0") - ok, mr_id = ep.reg(tensor.data_ptr(), tensor.numel() * 4) - assert ok + tensor, t_id = create_tensor((1024,), dtype=torch.float32, device="cuda:0") + mr_id = get_tensor_id_by_tensor(tensor=tensor) + assert tensor.is_contiguous() + assert mr_id == t_id fifo_meta = fifo_meta_q.recv() assert isinstance(fifo_meta, (bytes, bytearray)) and len(fifo_meta) == 64 @@ -72,12 +74,12 @@ def client_proc(ep_meta_q, fifo_meta_q): assert ok, "accept failed" print(f"[Client] accepted (conn_id={conn_id})") - tensor = torch.ones(1024, dtype=torch.float32, device="cuda:0") - + tensor, t_id = create_tensor((1024,), dtype=torch.float32, device="cuda:0") print("data pointer hex", hex(tensor.data_ptr())) torch.cuda.synchronize() - ok, mr_id = ep.reg(tensor.data_ptr(), tensor.numel() * 4) - assert ok + mr_id = get_tensor_id_by_tensor(tensor=tensor) + assert mr_id == t_id + time.sleep(0.1) print("advertise data pointer hex", hex(tensor.data_ptr())) ok, fifo_blob = ep.advertise( diff --git a/p2p/tests/test_engine_send.py b/p2p/tests/test_engine_send.py index 8a99b455d..06cf1c441 100644 --- a/p2p/tests/test_engine_send.py +++ b/p2p/tests/test_engine_send.py @@ -15,6 +15,7 @@ try: from uccl import p2p + from uccl.utils import get_tensor_id_by_tensor, create_tensor print("✓ Successfully imported p2p") except ImportError as e: @@ -53,11 +54,10 @@ def server_process(meta_q): f"Server accepted connection from {remote_ip_addr}, GPU {remote_gpu_idx}, conn_id={conn_id}" ) - tensor = torch.zeros(1024, dtype=torch.float32) + tensor, t_id = create_tensor((1024,), dtype=torch.float32) + mr_id = get_tensor_id_by_tensor(tensor=tensor) assert tensor.is_contiguous() - - success, mr_id = engine.reg(tensor.data_ptr(), tensor.numel() * 4) - assert success + assert mr_id == t_id success = engine.recv( conn_id, mr_id, tensor.data_ptr(), size=tensor.numel() * 8 @@ -78,11 +78,10 @@ def client_process(meta_q): assert success print(f"Client connected successfully: conn_id={conn_id}") - tensor = torch.ones(1024, dtype=torch.float32) + tensor, t_id = create_tensor((1024,), dtype=torch.float32) + mr_id = get_tensor_id_by_tensor(tensor=tensor) assert tensor.is_contiguous() - - success, mr_id = engine.reg(tensor.data_ptr(), tensor.numel() * 4) - assert success + assert mr_id == t_id success = engine.send(conn_id, mr_id, tensor.data_ptr(), tensor.numel() * 4) assert success diff --git a/p2p/tests/test_engine_write.py b/p2p/tests/test_engine_write.py index 7c2a49654..eb76ca64a 100644 --- a/p2p/tests/test_engine_write.py +++ b/p2p/tests/test_engine_write.py @@ -15,6 +15,7 @@ try: from uccl import p2p + from uccl.utils import get_tensor_id_by_tensor, create_tensor except ImportError as e: sys.stderr.write(f"Failed to import p2p: {e}\n") raise @@ -49,9 +50,9 @@ def server_proc(ep_meta_q, fifo_meta_q): assert ok, "connect failed" print(f"[Server] connected (conn_id={conn_id})") - tensor = torch.ones(1024, dtype=torch.float32, device="cuda:0") - ok, mr_id = ep.reg(tensor.data_ptr(), tensor.numel() * 4) - assert ok + tensor, t_id = create_tensor((1024,), dtype=torch.float32, device="cuda:0") + mr_id = get_tensor_id_by_tensor(tensor=tensor) + assert mr_id == t_id fifo_meta = fifo_meta_q.recv() assert isinstance(fifo_meta, (bytes, bytearray)) and len(fifo_meta) == 64 @@ -72,12 +73,12 @@ def client_proc(ep_meta_q, fifo_meta_q): assert ok, "accept failed" print(f"[Client] accepted (conn_id={conn_id})") - tensor = torch.ones(1024, dtype=torch.float32, device="cuda:0") - + tensor, t_id = create_tensor((1024,), dtype=torch.float32, device="cuda:0") print("data pointer hex", hex(tensor.data_ptr())) torch.cuda.synchronize() - ok, mr_id = ep.reg(tensor.data_ptr(), tensor.numel() * 4) - assert ok + mr_id = get_tensor_id_by_tensor(tensor=tensor) + assert mr_id == t_id + time.sleep(0.1) print("advertise data pointer hex", hex(tensor.data_ptr())) ok, fifo_blob = ep.advertise( diff --git a/p2p/tests/test_tensor.py b/p2p/tests/test_tensor.py new file mode 100644 index 000000000..682015de3 --- /dev/null +++ b/p2p/tests/test_tensor.py @@ -0,0 +1,27 @@ +import sys +import torch +import gc + +try: + from uccl import utils, p2p + + print("✓ Successfully imported p2p") +except ImportError as e: + print(f"✗ Failed to import p2p: {e}") + print("Make sure to run 'make' first to build the module") + sys.exit(1) + +use_gpu_id = 0 +engine = p2p.Endpoint(local_gpu_idx=use_gpu_id, num_cpus=4) + +tensor, tensor_id = utils.create_tensor((2, 4, 2), torch.float32, f"cuda:{use_gpu_id}") +print(f"Allocate tensor: id={tensor_id}") +tensor2, tensor_id2 = utils.create_tensor((2,), torch.float32, "cpu") +print(f"Allocate tensor2: id={tensor_id2}") +t_id = utils.get_tensor_id_by_tensor(tensor) +print(f"Query tensor: id={tensor_id}") +# test auto free +del tensor +gc.collect() +# print(f"Try to free tensor manaual: id={t_id}") +# utils.free_tensor(tensor) diff --git a/p2p/transfer.py b/p2p/transfer.py index 620be4d3b..b2c86db69 100644 --- a/p2p/transfer.py +++ b/p2p/transfer.py @@ -1,6 +1,11 @@ import socket import torch -from .utils import create_socket_and_connect, send_obj, recv_obj +from .utils import ( + create_socket_and_connect, + send_obj, + recv_obj, + get_tensor_id_by_tensor, +) try: from . import p2p @@ -132,9 +137,7 @@ def register_transfer( assert tensor.is_contiguous() data = tensor.data_ptr() size = tensor.numel() * tensor.element_size() - - success, mr_id = self.ep.reg(data, size) - assert success, f"Failed to register tensor on GPU {self.local_gpu_idx}" + mr_id = get_tensor_id_by_tensor(tensor=tensor) conn_state = self.conn_table[conn_id] self.transfer_table[self.next_transfer_id] = self.TransferState( diff --git a/p2p/utils.py b/p2p/utils.py index 7b6e17290..f30a3503a 100644 --- a/p2p/utils.py +++ b/p2p/utils.py @@ -4,7 +4,14 @@ import time import struct import pickle -from typing import Any +from typing import Any, Tuple, Dict +import torch +import weakref + +try: + from . import p2p +except ImportError: + import p2p def set_files_limit(): @@ -108,3 +115,89 @@ def recv_obj(sock: socket.socket) -> Any: return None payload = _recv_exact(sock, length) return pickle.loads(payload) + + +_GLOBAL_TENSOR_IDS: Dict[int, int] = {} + + +def get_tensor_id_by_tensor(tensor: torch.Tensor): + if not tensor.is_contiguous(): + raise ValueError("Tensor must be contiguous") + ptr = tensor.data_ptr() + if ptr not in _GLOBAL_TENSOR_IDS: + raise RuntimeError( + f"Tensor memory not registered for communication. " + f"Call create_tensor() to create a tensor." + ) + return _GLOBAL_TENSOR_IDS[ptr] + + +def _auto_free_tensor(ptr: int, tensor_id: int): + if ptr not in _GLOBAL_TENSOR_IDS: + return + # print(f"[Auto Free] ptr={ptr}, tensor_id={tensor_id}") + try: + p2p.dereg_mem(tensor_id) + finally: + _GLOBAL_TENSOR_IDS.pop(ptr, None) + + +def create_tensor( + shape: Tuple[int, ...], dtype: torch.dtype, device: str = "cuda:0" +) -> Tuple[torch.Tensor, int]: + """ + Create an empty tensor and register its memory (GPU or pinned CPU). + Automatically handles CUDA device parsing and memory registration. + """ + if device == "cpu": + gpu_id = 0 + use_cuda = False + elif device.startswith("cuda"): + parts = device.split(":") + if len(parts) == 2 and parts[1].isdigit(): + gpu_id = int(parts[1]) + elif len(parts) == 1: + gpu_id = torch.cuda.current_device() + else: + raise ValueError(f"Invalid CUDA device string: '{device}'") + use_cuda = True + else: + raise ValueError(f"Unsupported device type: '{device}'") + + tensor = torch.empty( + size=shape, + dtype=dtype, + device=device if use_cuda else "cpu", + pin_memory=not use_cuda, + ) + + addr = tensor.data_ptr() + size = tensor.numel() * tensor.element_size() + + if addr in _GLOBAL_TENSOR_IDS: + raise RuntimeError(f"Tensor at address {addr} is already registered.") + + tensor_id = p2p.reg_mem(addr, size, use_cuda, gpu_id) + if tensor_id < 0: + raise RuntimeError(f"Failed to register memory (tensor_id={tensor_id})") + + _GLOBAL_TENSOR_IDS[addr] = tensor_id + weakref.finalize(tensor, _auto_free_tensor, addr, tensor_id) + + return tensor, tensor_id + + +def free_tensor(tensor: torch.Tensor): + """ + Note that the GPU memory allocated for a tensor is automatically freed + when the tensor object is garbage collected. Manual intervention to free + GPU memory is generally not required unless immediate resource release + is necessary. + """ + ptr = tensor.data_ptr() + if ptr not in _GLOBAL_TENSOR_IDS: + print(f"Tensor at address {ptr} has been deregistered") + return + + tensor_id = _GLOBAL_TENSOR_IDS[ptr] + _auto_free_tensor(ptr, tensor_id)