Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions docker/Dockerfile.rocm
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,20 @@ RUN python${PY_VER} -m pip install --no-cache-dir build auditwheel pybind11
RUN ln -sf /usr/bin/python${PY_VER} /usr/local/bin/python3 && \
ln -sf /usr/bin/python${PY_VER}-config /usr/local/bin/python3-config

# ───── Install PyTorch with ROCm support ─────
# pytorch does not support ROCm 7.x now
ARG BASE_IMAGE
RUN set -ex; \
if [ "$BASE_IMAGE" = "rocm/dev-ubuntu-22.04:6.4.3-complete" ]; then \
echo "Installing pip and PyTorch with ROCm 6.4 support..."; \
apt-get update && apt-get install -y python3-pip; \
python3 -m pip install --upgrade pip; \
python3 -m pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/rocm6.4; \
python3 -c "import torch; print('CUDA available:', torch.cuda.is_available())"; \
else \
echo "BASE_IMAGE='$BASE_IMAGE', skipping PyTorch installation."; \
fi

WORKDIR /io

CMD ["bash"]
2 changes: 1 addition & 1 deletion p2p/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ PLUGIN_SO := libuccl_engine.so
CAPI_SOURCE := uccl_engine.cc
CAPI_HEADER := uccl_engine.h
CAPI_OBJECT := $(CAPI_SOURCE:.cc=.o)
SOURCES := engine.cc pybind_engine.cc
SOURCES := engine.cc pybind_engine.cc tensor.cc
CORE_OBJECT := engine.o
OBJECTS := $(SOURCES:.cc=.o)

Expand Down
2 changes: 1 addition & 1 deletion p2p/Makefile.rocm
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ LDFLAGS = -L$(HIP_LIB) -lamdhip64 \

# Target and source files
TARGET := p2p$(PYEXT)
SOURCES := engine.cc pybind_engine.cc
SOURCES := engine.cc pybind_engine.cc tensor.cc
OBJECTS := $(SOURCES:.cc=.o)

# Default target
Expand Down
112 changes: 46 additions & 66 deletions p2p/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -285,31 +285,25 @@ if success:

```python
# Sender side
send_tensor = torch.ones(1024, dtype=torch.float32)
assert send_tensor.is_contiguous() # Ensure tensor is contiguous
from uccl.utils import create_tensor, get_tensor_id_by_tensor
send_tensor, mr_id = create_tensor((1024,) dtype=torch.float32, device="cpu")

# Register tensor for RDMA
success, mr_id = endpoint.reg(send_tensor.data_ptr(), send_tensor.numel() * 4)
assert success
# You can get mr_id later with get_tensor_id_by_tensor
mr_id = get_tensor_id_by_tensor(send_tensor)

# Send the tensor
success = endpoint.send(conn_id, mr_id, send_tensor.data_ptr(), send_tensor.numel() * 4)
assert success

# Receiver side
recv_tensor = torch.zeros(1024, dtype=torch.float32)
assert recv_tensor.is_contiguous()

# Register receive buffer
success, mr_id = endpoint.reg(recv_tensor.data_ptr(), recv_tensor.numel() * 4)
assert success
recv_tensor, mr_id = create_tensor((1024,) dtype=torch.float32, device="cpu")

# Receive the tensor
success = endpoint.recv(conn_id, mr_id, recv_tensor.data_ptr(), recv_tensor.numel() * 4)
assert success
```

### NumPy Array Transfer
### NumPy Array Transfer [TODO]

```python
import numpy as np
Expand Down Expand Up @@ -338,21 +332,13 @@ success = endpoint.recv(conn_id, recv_mr_id, recv_ptr, recv_data.nbytes)

```python
# Sender side - send multiple tensors at once
tensors = [
torch.ones(512, dtype=torch.float32),
torch.ones(1024, dtype=torch.float32),
torch.ones(256, dtype=torch.float32)
]

# Ensure all tensors are contiguous
for tensor in tensors:
assert tensor.is_contiguous()

# Register all tensors
shapes = [(512,), (1024,), (256,)]
tensors = []
mr_ids = []
for tensor in tensors:
success, mr_id = endpoint.reg(tensor.data_ptr(), tensor.numel() * 4)
assert success
for shape in shapes:
tensor, mr_id = create_tensor(shape, dtype=torch.float32, device="cuda:0")
assert tensor.is_contiguous()
tensors.append(tensor)
mr_ids.append(mr_id)

# Prepare data for vectorized send
Expand All @@ -365,18 +351,13 @@ success = endpoint.sendv(conn_id, mr_ids, ptr_list, size_list, num_iovs)
assert success

# Receiver side - receive multiple tensors at once
recv_tensors = [
torch.zeros(512, dtype=torch.float32),
torch.zeros(1024, dtype=torch.float32),
torch.zeros(256, dtype=torch.float32)
]

# Register receive buffers
recv_tensors = []
recv_mr_ids = []
for tensor in recv_tensors:
success, mr_id = endpoint.reg(tensor.data_ptr(), tensor.numel() * 4)
assert success
recv_mr_ids.append(mr_id)
for shape in shapes:
recv_tensor, recv_mr_id = create_tensor(shape, dtype=torch.float32, device="cuda:0")
assert recv_tensor.is_contiguous()
recv_tensors.append(recv_tensor)
recv_mr_ids.append(recv_mr_id)

# Prepare data for vectorized receive
recv_ptr_list = [tensor.data_ptr() for tensor in recv_tensors]
Expand All @@ -394,6 +375,20 @@ assert success

<details><summary>Click me</summary>

### Tensor

#### create_tensor
```python
create_tensor(shape, dtype, device) -> (tensor, tensor_id)
```
Create an empty tensor and register its memory (GPU or pinned CPU).
Automatically handles CUDA device parsing and memory registration.

**Parameters:**
- `shape` (Tuple[int, ...]): (1024, )
- `dtype` (int): torch.dtype
- `device` (int): cpu or cuda:0

### Endpoint Class

#### Constructor
Expand Down Expand Up @@ -434,21 +429,6 @@ Accept an incoming connection (blocking).
- `remote_gpu_idx` (int): GPU index of connecting client
- `conn_id` (int): Connection ID for subsequent operations

#### Memory Registration

```python
reg(ptr, size) -> (success, mr_id)
```
Register a memory region for RDMA operations.

**Parameters:**
- `ptr` (int): Memory pointer (use `tensor.data_ptr()` for PyTorch)
- `size` (int): Size in bytes

**Returns:**
- `success` (bool): Whether registration succeeded
- `mr_id` (int): Memory region ID for transfer operations

#### Data Transfer

```python
Expand All @@ -458,7 +438,7 @@ Send data to remote endpoint (blocking).

**Parameters:**
- `conn_id` (int): Connection ID from connect/accept
- `mr_id` (int): Memory region ID from register
- `mr_id` (int): Memory region ID (Tensor ID) from registed Tensor
- `ptr` (int): Pointer to data to send
- `size` (int): Number of bytes to send

Expand All @@ -472,7 +452,7 @@ Receive data from remote endpoint (blocking).

**Parameters:**
- `conn_id` (int): Connection ID from connect/accept
- `mr_id` (int): Memory region ID from register
- `mr_id` (int): Memory region ID (Tensor ID) from registed Tensor
- `ptr` (int): Pointer to buffer for received data
- `size` (int): Number of bytes to receive

Expand All @@ -486,7 +466,7 @@ Send multiple memory regions to remote endpoint in a single operation (blocking)

**Parameters:**
- `conn_id` (int): Connection ID from connect/accept
- `mr_id_list` (list[int]): List of memory region IDs from register
- `mr_id_list` (list[int]): List of memory region IDs (Tensor ID) from registed Tensor
- `ptr_list` (list[int]): List of pointers to data to send
- `size_list` (list[int]): List of sizes in bytes for each memory region
- `num_iovs` (int): Number of I/O vectors (length of the lists)
Expand All @@ -501,7 +481,7 @@ Receive multiple memory regions from remote endpoint in a single operation (bloc

**Parameters:**
- `conn_id` (int): Connection ID from connect/accept
- `mr_id_list` (list[int]): List of memory region IDs from register
- `mr_id_list` (list[int]): List of memory region IDs (Tensor ID) from registed Tensor
- `ptr_list` (list[int]): List of pointers to buffers for received data
- `size_list` (list[int]): List of sizes in bytes for each memory region
- `num_iovs` (int): Number of I/O vectors (length of the lists)
Expand All @@ -518,7 +498,7 @@ Send data to remote endpoint asynchronously (non-blocking).

**Parameters:**
- `conn_id` (int): Connection ID from connect/accept
- `mr_id` (int): Memory region ID from register
- `mr_id` (int): Memory region ID (Tensor ID) from registed Tensor
- `ptr` (int): Pointer to data to send
- `size` (int): Number of bytes to send

Expand All @@ -533,7 +513,7 @@ Receive data from remote endpoint asynchronously (non-blocking).

**Parameters:**
- `conn_id` (int): Connection ID from connect/accept
- `mr_id` (int): Memory region ID from register
- `mr_id` (int): Memory region ID (Tensor ID) from registed Tensor
- `ptr` (int): Pointer to buffer for received data
- `size` (int): Exact number of bytes to receive

Expand Down Expand Up @@ -562,7 +542,7 @@ Read data from remote endpoint using one-sided RDMA READ operation (blocking).

**Parameters:**
- `conn_id` (int): Connection ID from connect/accept
- `mr_id` (int): Memory region ID of remote data to read
- `mr_id` (int): Memory region ID (Tensor ID) of remote data to read
- `dst` (int): Pointer to local destination buffer
- `size` (int): Number of bytes to read
- `slot_item` (FifoItem): Slot item for RDMA operation coordination (contains the remote address to read from)
Expand All @@ -577,7 +557,7 @@ Read data from remote endpoint using one-sided RDMA READ operation asynchronousl

**Parameters:**
- `conn_id` (int): Connection ID from connect/accept
- `mr_id` (int): Memory region ID of remote data to read
- `mr_id` (int): Memory region ID (Tensor ID) of remote data to read
- `dst` (int): Pointer to local destination buffer
- `size` (int): Number of bytes to read
- `slot_item` (FifoItem): Slot item for RDMA operation coordination (contains the remote address to read from)
Expand All @@ -593,7 +573,7 @@ Read multiple memory regions from remote endpoint using one-sided RDMA READ oper

**Parameters:**
- `conn_id` (int): Connection ID from connect/accept
- `mr_id_list` (list[int]): List of memory region IDs of remote data to read
- `mr_id_list` (list[int]): List of memory region IDs (Tensor ID) of remote data to read
- `dst_list` (list[int]): List of pointers to local destination buffers
- `size_list` (list[int]): List of sizes in bytes for each memory region
- `slot_item_list` (list[FifoItem]): List of slot items for RDMA operation coordination (contains the remote address to read from)
Expand All @@ -609,7 +589,7 @@ Advertise memory region information to remote endpoint for one-sided RDMA operat

**Parameters:**
- `conn_id` (int): Connection ID from connect/accept
- `mr_id` (int): Memory region ID to advertise
- `mr_id` (int): Memory region ID (Tensor ID) to advertise
- `addr` (int): Pointer to the memory region
- `len` (int): Size of the memory region in bytes
- `out_buf` (str): Output buffer to store advertisement metadata
Expand All @@ -624,7 +604,7 @@ Advertise multiple memory regions to remote endpoint for one-sided RDMA operatio

**Parameters:**
- `conn_id` (int): Connection ID from connect/accept
- `mr_id_list` (list[int]): List of memory region IDs to advertise
- `mr_id_list` (list[int]): List of memory region IDs (Tensor ID) to advertise
- `addr_list` (list[int]): List of pointers to memory regions
- `len_list` (list[int]): List of sizes in bytes for each memory region
- `out_buf_list` (list[str]): List of output buffers to store advertisement metadata
Expand All @@ -640,7 +620,7 @@ Write data to remote endpoint using one-sided RDMA WRITE operation (blocking).

**Parameters:**
- `conn_id` (int): Connection ID from connect/accept
- `mr_id` (int): Memory region ID of remote destination
- `mr_id` (int): Memory region ID (Tensor ID) of remote destination
- `src` (int): Pointer to local buffer
- `size` (int): Number of bytes to write
- `slot_item` (FifoItem): Slot item for RDMA operation coordination (contains the remote address to write to)
Expand All @@ -655,7 +635,7 @@ Write data to remote endpoint using one-sided RDMA WRITE operation asynchronousl

**Parameters:**
- `conn_id` (int): Connection ID from connect/accept
- `mr_id` (int): Memory region ID of remote destination
- `mr_id` (int): Memory region ID (Tensor ID) of remote destination
- `src` (int): Pointer to local buffer
- `size` (int): Number of bytes to write
- `slot_item` (FifoItem): Slot item for RDMA operation coordination (contains the remote address to write to)
Expand All @@ -671,7 +651,7 @@ Write multiple memory regions to remote endpoint using one-sided RDMA WRITE oper

**Parameters:**
- `conn_id` (int): Connection ID from connect/accept
- `mr_id_list` (list[int]): List of memory region IDs of remote destinations
- `mr_id_list` (list[int]): List of memory region IDs (Tensor ID) of remote destinations
- `src_list` (list[int]): List of pointers to local buffers
- `size_list` (list[int]): List of sizes in bytes for each memory region
- `slot_item_list` (list[FifoItem]): List of slot items for RDMA operation coordination (contains the remote address to write to)
Expand Down
Loading