Skip to content

run distributed test on xelink, and enable WA with FI_PROVIDER=tcp #1694

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
2 changes: 1 addition & 1 deletion .github/workflows/_linux_ut.yml
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ jobs:
path: ${{ github.workspace }}/ut_log/ut_failure_list.csv

distributed_ut_test:
runs-on: pvc_e2e
runs-on: pytorch-06
if: contains(inputs.ut, 'xpu_distributed')
timeout-minutes: 900
env:
Expand Down
210 changes: 105 additions & 105 deletions test/xpu/distributed/test_c10d_ops_xccl.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ def allreduce(tensors, op):
tensors[0],
)

# # Avg
# Avg
# tensors = [torch.tensor([self.rank + 1.0]).xpu(local_device_id).to(dtype)]

# allreduce(tensors, c10d.ReduceOp.AVG)
Expand Down Expand Up @@ -194,30 +194,30 @@ def allreduce(tensors, op):
with self.assertRaisesRegex(ValueError, "Cannot use " + err + " with XCCL"):
allreduce(tensors, op)

# @requires_xccl()
# @skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "XCCL test requires 2+ GPUs")
# def test_alltoall_ops_with_xpufree_race(self):
# pg = self.pg
# opts = c10d.AllToAllOptions()
# local_device = f"xpu:{self.rank_to_GPU[self.rank][0]}"
# torch.xpu.set_device(local_device)
# input = torch.rand(1000, 1000, device=local_device)
# output = torch.rand(1000, 1000, device=local_device)
# race_tensors = []
# # create some tensors to race with alltoall collective
# for _ in range(10):
# tmp = []
# for i in range(5):
# tmp.append(torch.rand(10 ** (3 + i), device=local_device))
# race_tensors.append(tmp)

# for i in range(10):
# race_tensors.pop()
# work = pg.alltoall_base(output, input, [], [], opts)
# # this triggers xpuFree
# torch.xpu.empty_cache()
# work.wait()
# torch.xpu.synchronize(device=local_device)
@requires_xccl()
@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "XCCL test requires 2+ GPUs")
def test_alltoall_ops_with_xpufree_race(self):
pg = self.pg
opts = c10d.AllToAllOptions()
local_device = f"xpu:{self.rank_to_GPU[self.rank][0]}"
torch.xpu.set_device(local_device)
input = torch.rand(1000, 1000, device=local_device)
output = torch.rand(1000, 1000, device=local_device)
race_tensors = []
# create some tensors to race with alltoall collective
for _ in range(10):
tmp = []
for i in range(5):
tmp.append(torch.rand(10 ** (3 + i), device=local_device))
race_tensors.append(tmp)

for i in range(10):
race_tensors.pop()
work = pg.alltoall_base(output, input, [], [], opts)
# this triggers xpuFree
torch.xpu.empty_cache()
work.wait()
torch.xpu.synchronize(device=local_device)

@requires_xccl()
@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "XCCL test requires 2+ GPUs")
Expand Down Expand Up @@ -840,86 +840,86 @@ def test_batch_isend_irecv(self):

self.assertEqual(recv_tensor, expected_tensor)

# @requires_xccl()
# @skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "XCCL test requires 2+ GPUs")
# def test_all_to_all_single(self):
# device = self.rank_to_GPU[self.rank][0]
# row = self.world_size * (self.rank + 1) * (self.world_size + 1) / 2
# x = torch.ones(int(row), 5, device=device) * (self.rank + 1)
# x.requires_grad = True
# y = torch.empty_like(x)
# split_sizes = [(i + 1) * (self.rank + 1) for i in range(self.world_size)]
# y = torch.distributed.nn.all_to_all_single(
# y, x, output_split_sizes=split_sizes, input_split_sizes=split_sizes
# )
# expected = []
# for idx, tensor in enumerate(torch.split(x, split_sizes)):
# expected.append(torch.full_like(tensor, (idx + 1)))
# expected = torch.cat(expected)
# self.assertEqual(y, expected)
# z = y.sin().sum()
# z.backward()
# x_s = ((self.rank + 1) * torch.ones(int(row), 5, device=device)).cos()
# self.assertEqual(x.grad, x_s)

# @requires_xccl()
# @skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "XCCL test requires 2+ GPUs")
# def test_all_to_all_single_unequal_split(self):
# device = self.rank_to_GPU[self.rank][0]
# in_splits = [i + 1 for i in range(self.world_size)]
# out_splits = [self.rank + 1 for _ in range(self.world_size)]
# in_tensor = torch.ones([sum(in_splits), self.world_size]) * self.rank
# out_tensor = torch.ones([(self.rank + 1) * self.world_size, self.world_size])
# expected_tensor = torch.cat(
# [
# torch.ones([self.rank + 1, self.world_size]) * i
# for i in range(self.world_size)
# ]
# )

# in_tensor = in_tensor.to(device)
# expected_tensor = expected_tensor.to(device)
# out_tensor = out_tensor.to(device)
# dist.all_to_all_single(out_tensor, in_tensor, out_splits, in_splits)
# self.assertEqual(out_tensor, expected_tensor)

# @requires_xccl()
# @skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "XCCL test requires 2+ GPUs")
# def test_all_to_all(self, dtype=torch.float):
# device = self.rank_to_GPU[self.rank][0]
# in_splits = [i + 1 for i in range(self.world_size)]
# in_tensors = [
# torch.ones([in_splits[i], self.world_size], dtype=dtype) * self.rank
# for i in range(self.world_size)
# ]
# out_tensors = [
# torch.ones([(self.rank + 1), self.world_size], dtype=dtype)
# for _ in range(self.world_size)
# ]
# expected_tensors = [
# torch.ones([self.rank + 1, self.world_size], dtype=dtype) * i
# for i in range(self.world_size)
# ]

# in_tensors = [t.to(device) for t in in_tensors]
# expected_tensors = [t.to(device) for t in expected_tensors]
# out_tensors = [t.to(device) for t in out_tensors]
# dist.all_to_all(out_tensors, in_tensors)
# for t1, t2 in zip(out_tensors, expected_tensors):
# self.assertEqual(t1, t2)

# @requires_xccl()
# @skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "XCCL test requires 2+ GPUs")
# def test_all_to_all_single_none(self):
# device = self.rank_to_GPU[self.rank][0]

# send = torch.full((self.world_size, 2), self.rank).to(device)

# out = torch.zeros(self.world_size, 2, dtype=send.dtype).to(device)
# dist.all_to_all_single(out, send)
# self.assertEqual(
# out.tolist(), list(zip(range(self.world_size), range(self.world_size)))
# )
@requires_xccl()
@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "XCCL test requires 2+ GPUs")
def test_all_to_all_single(self):
device = self.rank_to_GPU[self.rank][0]
row = self.world_size * (self.rank + 1) * (self.world_size + 1) / 2
x = torch.ones(int(row), 5, device=device) * (self.rank + 1)
x.requires_grad = True
y = torch.empty_like(x)
split_sizes = [(i + 1) * (self.rank + 1) for i in range(self.world_size)]
y = torch.distributed.nn.all_to_all_single(
y, x, output_split_sizes=split_sizes, input_split_sizes=split_sizes
)
expected = []
for idx, tensor in enumerate(torch.split(x, split_sizes)):
expected.append(torch.full_like(tensor, (idx + 1)))
expected = torch.cat(expected)
self.assertEqual(y, expected)
z = y.sin().sum()
z.backward()
x_s = ((self.rank + 1) * torch.ones(int(row), 5, device=device)).cos()
self.assertEqual(x.grad, x_s)

@requires_xccl()
@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "XCCL test requires 2+ GPUs")
def test_all_to_all_single_unequal_split(self):
device = self.rank_to_GPU[self.rank][0]
in_splits = [i + 1 for i in range(self.world_size)]
out_splits = [self.rank + 1 for _ in range(self.world_size)]
in_tensor = torch.ones([sum(in_splits), self.world_size]) * self.rank
out_tensor = torch.ones([(self.rank + 1) * self.world_size, self.world_size])
expected_tensor = torch.cat(
[
torch.ones([self.rank + 1, self.world_size]) * i
for i in range(self.world_size)
]
)

in_tensor = in_tensor.to(device)
expected_tensor = expected_tensor.to(device)
out_tensor = out_tensor.to(device)
dist.all_to_all_single(out_tensor, in_tensor, out_splits, in_splits)
self.assertEqual(out_tensor, expected_tensor)

@requires_xccl()
@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "XCCL test requires 2+ GPUs")
def test_all_to_all(self, dtype=torch.float):
device = self.rank_to_GPU[self.rank][0]
in_splits = [i + 1 for i in range(self.world_size)]
in_tensors = [
torch.ones([in_splits[i], self.world_size], dtype=dtype) * self.rank
for i in range(self.world_size)
]
out_tensors = [
torch.ones([(self.rank + 1), self.world_size], dtype=dtype)
for _ in range(self.world_size)
]
expected_tensors = [
torch.ones([self.rank + 1, self.world_size], dtype=dtype) * i
for i in range(self.world_size)
]

in_tensors = [t.to(device) for t in in_tensors]
expected_tensors = [t.to(device) for t in expected_tensors]
out_tensors = [t.to(device) for t in out_tensors]
dist.all_to_all(out_tensors, in_tensors)
for t1, t2 in zip(out_tensors, expected_tensors):
self.assertEqual(t1, t2)

@requires_xccl()
@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "XCCL test requires 2+ GPUs")
def test_all_to_all_single_none(self):
device = self.rank_to_GPU[self.rank][0]

send = torch.full((self.world_size, 2), self.rank).to(device)

out = torch.zeros(self.world_size, 2, dtype=send.dtype).to(device)
dist.all_to_all_single(out, send)
self.assertEqual(
out.tolist(), list(zip(range(self.world_size), range(self.world_size)))
)


instantiate_parametrized_tests(ProcessGroupXCCLOpTest)
Expand Down
2 changes: 2 additions & 0 deletions test/xpu/run_distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
res2 = 0
fail_test = []

# libfabric WA to avoid hang issue
os.environ["FI_PROVIDER"] = "tcp"
# os.environ["ZE_AFFINITY_MASK"] = "0,1,2,3"


# run python test
Expand Down