Skip to content

Commit

Permalink
[SPARK-43122][CONNECT][PYTHON][ML][TESTS] Reenable TorchDistributorLo…
Browse files Browse the repository at this point in the history
…calUnitTestsOnConnect and TorchDistributorLocalUnitTestsIIOnConnect

### What changes were proposed in this pull request?
`TorchDistributorLocalUnitTestsOnConnect` and `TorchDistributorLocalUnitTestsIIOnConnect` were not stable and occasionally got stuck. However, I can not reproduce the issue locally.

The two UTs were disabled, and this PR is to reenable them. I found that the all the tests for PyTorch set up the regular sessions or connect sessions in `setUp` and close them in `tearDown`, however such session operations are very expensive and should be placed into `setUpClass` and `tearDownClass` instead. After this change, the related tests seems much stable. So I think the root cause is still related to the resources, since TorchDistributor works on barrier mode, when there is not enough resources in Github Action, the tests just keep waiting.

### Why are the changes needed?
for test coverage

### Does this PR introduce _any_ user-facing change?
No, test-only

### How was this patch tested?
CI

Closes apache#40793 from zhengruifeng/torch_reenable.

Lead-authored-by: Ruifeng Zheng <[email protected]>
Co-authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
  • Loading branch information
zhengruifeng and zhengruifeng committed Apr 18, 2023
1 parent db2625c commit dc84e52
Show file tree
Hide file tree
Showing 2 changed files with 158 additions and 130 deletions.
111 changes: 61 additions & 50 deletions python/pyspark/ml/tests/connect/test_parity_torch_distributor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import os
import shutil
import tempfile
import unittest

have_torch = True
Expand All @@ -33,38 +32,45 @@
TorchDistributorLocalUnitTestsMixin,
TorchDistributorDistributedUnitTestsMixin,
TorchWrapperUnitTestsMixin,
set_up_test_dirs,
get_local_mode_conf,
get_distributed_mode_conf,
)


@unittest.skipIf(not have_torch, "torch is required")
class TorchDistributorBaselineUnitTestsOnConnect(
TorchDistributorBaselineUnitTestsMixin, unittest.TestCase
):
def setUp(self) -> None:
self.spark = SparkSession.builder.remote("local[4]").getOrCreate()
@classmethod
def setUpClass(cls):
cls.spark = SparkSession.builder.remote("local[4]").getOrCreate()

def tearDown(self) -> None:
self.spark.stop()
@classmethod
def tearDownClass(cls):
cls.spark.stop()


@unittest.skip("unstable, ignore for now")
@unittest.skipIf(not have_torch, "torch is required")
class TorchDistributorLocalUnitTestsOnConnect(
TorchDistributorLocalUnitTestsMixin, unittest.TestCase
):
def setUp(self) -> None:
class_name = self.__class__.__name__
conf = self._get_spark_conf()
builder = SparkSession.builder.appName(class_name)
for k, v in conf.getAll():
if k not in ["spark.master", "spark.remote", "spark.app.name"]:
builder = builder.config(k, v)
self.spark = builder.remote("local-cluster[2,2,1024]").getOrCreate()
self.mnist_dir_path = tempfile.mkdtemp()

def tearDown(self) -> None:
shutil.rmtree(self.mnist_dir_path)
os.unlink(self.gpu_discovery_script_file.name)
self.spark.stop()
@classmethod
def setUpClass(cls):
(cls.gpu_discovery_script_file_name, cls.mnist_dir_path) = set_up_test_dirs()
builder = SparkSession.builder.appName(cls.__name__)
for k, v in get_local_mode_conf().items():
builder = builder.config(k, v)
builder = builder.config(
"spark.driver.resource.gpu.discoveryScript", cls.gpu_discovery_script_file_name
)
cls.spark = builder.remote("local-cluster[2,2,1024]").getOrCreate()

@classmethod
def tearDownClass(cls):
shutil.rmtree(cls.mnist_dir_path)
os.unlink(cls.gpu_discovery_script_file_name)
cls.spark.stop()

def _get_inputs_for_test_local_training_succeeds(self):
return [
Expand All @@ -75,24 +81,27 @@ def _get_inputs_for_test_local_training_succeeds(self):
]


@unittest.skip("unstable, ignore for now")
@unittest.skipIf(not have_torch, "torch is required")
class TorchDistributorLocalUnitTestsIIOnConnect(
TorchDistributorLocalUnitTestsMixin, unittest.TestCase
):
def setUp(self) -> None:
class_name = self.__class__.__name__
conf = self._get_spark_conf()
builder = SparkSession.builder.appName(class_name)
for k, v in conf.getAll():
if k not in ["spark.master", "spark.remote", "spark.app.name"]:
builder = builder.config(k, v)
self.spark = builder.remote("local[4]").getOrCreate()
self.mnist_dir_path = tempfile.mkdtemp()

def tearDown(self) -> None:
shutil.rmtree(self.mnist_dir_path)
os.unlink(self.gpu_discovery_script_file.name)
self.spark.stop()
@classmethod
def setUpClass(cls):
(cls.gpu_discovery_script_file_name, cls.mnist_dir_path) = set_up_test_dirs()
builder = SparkSession.builder.appName(cls.__name__)
for k, v in get_local_mode_conf().items():
builder = builder.config(k, v)

builder = builder.config(
"spark.driver.resource.gpu.discoveryScript", cls.gpu_discovery_script_file_name
)
cls.spark = builder.remote("local[4]").getOrCreate()

@classmethod
def tearDownClass(cls):
shutil.rmtree(cls.mnist_dir_path)
os.unlink(cls.gpu_discovery_script_file_name)
cls.spark.stop()

def _get_inputs_for_test_local_training_succeeds(self):
return [
Expand All @@ -107,21 +116,23 @@ def _get_inputs_for_test_local_training_succeeds(self):
class TorchDistributorDistributedUnitTestsOnConnect(
TorchDistributorDistributedUnitTestsMixin, unittest.TestCase
):
def setUp(self) -> None:
class_name = self.__class__.__name__
conf = self._get_spark_conf()
builder = SparkSession.builder.appName(class_name)
for k, v in conf.getAll():
if k not in ["spark.master", "spark.remote", "spark.app.name"]:
builder = builder.config(k, v)

self.spark = builder.remote("local-cluster[2,2,1024]").getOrCreate()
self.mnist_dir_path = tempfile.mkdtemp()

def tearDown(self) -> None:
shutil.rmtree(self.mnist_dir_path)
os.unlink(self.gpu_discovery_script_file.name)
self.spark.stop()
@classmethod
def setUpClass(cls):
(cls.gpu_discovery_script_file_name, cls.mnist_dir_path) = set_up_test_dirs()
builder = SparkSession.builder.appName(cls.__name__)
for k, v in get_distributed_mode_conf().items():
builder = builder.config(k, v)

builder = builder.config(
"spark.worker.resource.gpu.discoveryScript", cls.gpu_discovery_script_file_name
)
cls.spark = builder.remote("local-cluster[2,2,1024]").getOrCreate()

@classmethod
def tearDownClass(cls):
shutil.rmtree(cls.mnist_dir_path)
os.unlink(cls.gpu_discovery_script_file_name)
cls.spark.stop()


@unittest.skipIf(not have_torch, "torch is required")
Expand Down
177 changes: 97 additions & 80 deletions python/pyspark/ml/torch/tests/test_distributor.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,45 @@ def train_fn(learning_rate: float) -> Any:
return train_fn


def set_up_test_dirs():
gpu_discovery_script_file = tempfile.NamedTemporaryFile(delete=False)
gpu_discovery_script_file_name = gpu_discovery_script_file.name
try:
gpu_discovery_script_file.write(
b'echo {\\"name\\": \\"gpu\\", \\"addresses\\": [\\"0\\",\\"1\\",\\"2\\"]}'
)
finally:
gpu_discovery_script_file.close()

# create temporary directory for Worker resources coordination
tempdir = tempfile.NamedTemporaryFile(delete=False)
os.unlink(tempdir.name)
os.chmod(
gpu_discovery_script_file_name,
stat.S_IRWXU | stat.S_IXGRP | stat.S_IRGRP | stat.S_IROTH | stat.S_IXOTH,
)
mnist_dir_path = tempfile.mkdtemp()

return (gpu_discovery_script_file_name, mnist_dir_path)


def get_local_mode_conf():
return {
"spark.test.home": SPARK_HOME,
"spark.driver.resource.gpu.amount": "3",
}


def get_distributed_mode_conf():
return {
"spark.test.home": SPARK_HOME,
"spark.worker.resource.gpu.amount": "3",
"spark.task.cpus": "2",
"spark.task.resource.gpu.amount": "1",
"spark.executor.resource.gpu.amount": "1",
}


class TorchDistributorBaselineUnitTestsMixin:
def setup_env_vars(self, input_map: Dict[str, str]) -> None:
for key, value in input_map.items():
Expand Down Expand Up @@ -271,37 +310,18 @@ def test_create_torchrun_command(self) -> None:

@unittest.skipIf(not have_torch, "torch is required")
class TorchDistributorBaselineUnitTests(TorchDistributorBaselineUnitTestsMixin, unittest.TestCase):
def setUp(self) -> None:
@classmethod
def setUpClass(cls):
conf = SparkConf()
sc = SparkContext("local[4]", conf=conf)
self.spark = SparkSession(sc)
cls.spark = SparkSession(sc)

def tearDown(self) -> None:
self.spark.stop()
@classmethod
def tearDownClass(cls):
cls.spark.stop()


class TorchDistributorLocalUnitTestsMixin:
def _get_spark_conf(self) -> SparkConf:
self.gpu_discovery_script_file = tempfile.NamedTemporaryFile(delete=False)
self.gpu_discovery_script_file.write(
b'echo {\\"name\\": \\"gpu\\", \\"addresses\\": [\\"0\\",\\"1\\",\\"2\\"]}'
)
self.gpu_discovery_script_file.close()
# create temporary directory for Worker resources coordination
self.tempdir = tempfile.NamedTemporaryFile(delete=False)
os.unlink(self.tempdir.name)
os.chmod(
self.gpu_discovery_script_file.name,
stat.S_IRWXU | stat.S_IXGRP | stat.S_IRGRP | stat.S_IROTH | stat.S_IXOTH,
)

conf = SparkConf().set("spark.test.home", SPARK_HOME)
conf = conf.set("spark.driver.resource.gpu.amount", "3")
conf = conf.set(
"spark.driver.resource.gpu.discoveryScript", self.gpu_discovery_script_file.name
)
return conf

def setup_env_vars(self, input_map: Dict[str, str]) -> None:
for key, value in input_map.items():
os.environ[key] = value
Expand Down Expand Up @@ -382,59 +402,49 @@ def test_end_to_end_run_locally(self) -> None:

@unittest.skipIf(not have_torch, "torch is required")
class TorchDistributorLocalUnitTests(TorchDistributorLocalUnitTestsMixin, unittest.TestCase):
def setUp(self) -> None:
class_name = self.__class__.__name__
conf = self._get_spark_conf()
sc = SparkContext("local-cluster[2,2,1024]", class_name, conf=conf)
self.spark = SparkSession(sc)
self.mnist_dir_path = tempfile.mkdtemp()
@classmethod
def setUpClass(cls):
(cls.gpu_discovery_script_file_name, cls.mnist_dir_path) = set_up_test_dirs()
conf = SparkConf()
for k, v in get_local_mode_conf().items():
conf = conf.set(k, v)
conf = conf.set(
"spark.driver.resource.gpu.discoveryScript", cls.gpu_discovery_script_file_name
)

sc = SparkContext("local-cluster[2,2,1024]", cls.__name__, conf=conf)
cls.spark = SparkSession(sc)

def tearDown(self) -> None:
shutil.rmtree(self.mnist_dir_path)
os.unlink(self.gpu_discovery_script_file.name)
self.spark.stop()
@classmethod
def tearDownClass(cls):
shutil.rmtree(cls.mnist_dir_path)
os.unlink(cls.gpu_discovery_script_file_name)
cls.spark.stop()


@unittest.skipIf(not have_torch, "torch is required")
class TorchDistributorLocalUnitTestsII(TorchDistributorLocalUnitTestsMixin, unittest.TestCase):
def setUp(self) -> None:
class_name = self.__class__.__name__
conf = self._get_spark_conf()
sc = SparkContext("local[4]", class_name, conf=conf)
self.spark = SparkSession(sc)
self.mnist_dir_path = tempfile.mkdtemp()
@classmethod
def setUpClass(cls):
(cls.gpu_discovery_script_file_name, cls.mnist_dir_path) = set_up_test_dirs()
conf = SparkConf()
for k, v in get_local_mode_conf().items():
conf = conf.set(k, v)
conf = conf.set(
"spark.driver.resource.gpu.discoveryScript", cls.gpu_discovery_script_file_name
)

def tearDown(self) -> None:
shutil.rmtree(self.mnist_dir_path)
os.unlink(self.gpu_discovery_script_file.name)
self.spark.stop()
sc = SparkContext("local[4]", cls.__name__, conf=conf)
cls.spark = SparkSession(sc)

@classmethod
def tearDownClass(cls):
shutil.rmtree(cls.mnist_dir_path)
os.unlink(cls.gpu_discovery_script_file_name)
cls.spark.stop()

class TorchDistributorDistributedUnitTestsMixin:
def _get_spark_conf(self) -> SparkConf:
self.gpu_discovery_script_file = tempfile.NamedTemporaryFile(delete=False)
self.gpu_discovery_script_file.write(
b'echo {\\"name\\": \\"gpu\\", \\"addresses\\": [\\"0\\",\\"1\\",\\"2\\"]}'
)
self.gpu_discovery_script_file.close()
# create temporary directory for Worker resources coordination
tempdir = tempfile.NamedTemporaryFile(delete=False)
os.unlink(tempdir.name)
os.chmod(
self.gpu_discovery_script_file.name,
stat.S_IRWXU | stat.S_IXGRP | stat.S_IRGRP | stat.S_IROTH | stat.S_IXOTH,
)

conf = SparkConf().set("spark.test.home", SPARK_HOME)
conf = conf.set(
"spark.worker.resource.gpu.discoveryScript", self.gpu_discovery_script_file.name
)
conf = conf.set("spark.worker.resource.gpu.amount", "3")
conf = conf.set("spark.task.cpus", "2")
conf = conf.set("spark.task.resource.gpu.amount", "1")
conf = conf.set("spark.executor.resource.gpu.amount", "1")
return conf

class TorchDistributorDistributedUnitTestsMixin:
def test_dist_training_succeeds(self) -> None:
CUDA_VISIBLE_DEVICES = "CUDA_VISIBLE_DEVICES"
inputs = [
Expand Down Expand Up @@ -482,17 +492,24 @@ def test_end_to_end_run_distributedly(self) -> None:
class TorchDistributorDistributedUnitTests(
TorchDistributorDistributedUnitTestsMixin, unittest.TestCase
):
def setUp(self) -> None:
class_name = self.__class__.__name__
conf = self._get_spark_conf()
sc = SparkContext("local-cluster[2,2,1024]", class_name, conf=conf)
self.spark = SparkSession(sc)
self.mnist_dir_path = tempfile.mkdtemp()

def tearDown(self) -> None:
shutil.rmtree(self.mnist_dir_path)
os.unlink(self.gpu_discovery_script_file.name)
self.spark.stop()
@classmethod
def setUpClass(cls):
(cls.gpu_discovery_script_file_name, cls.mnist_dir_path) = set_up_test_dirs()
conf = SparkConf()
for k, v in get_distributed_mode_conf().items():
conf = conf.set(k, v)
conf = conf.set(
"spark.worker.resource.gpu.discoveryScript", cls.gpu_discovery_script_file_name
)

sc = SparkContext("local-cluster[2,2,1024]", cls.__name__, conf=conf)
cls.spark = SparkSession(sc)

@classmethod
def tearDownClass(cls):
shutil.rmtree(cls.mnist_dir_path)
os.unlink(cls.gpu_discovery_script_file_name)
cls.spark.stop()


class TorchWrapperUnitTestsMixin:
Expand Down

0 comments on commit dc84e52

Please sign in to comment.