diff --git a/ci/test_python_other.sh b/ci/test_python_other.sh index 8f330eb9a30..e69fbd52107 100755 --- a/ci/test_python_other.sh +++ b/ci/test_python_other.sh @@ -40,14 +40,10 @@ rapids-logger "pytest custreamz" --cov-report=xml:"${RAPIDS_COVERAGE_DIR}/custreamz-coverage.xml" \ --cov-report=term -# Note that cudf-polars uses rmm.mr.CudaAsyncMemoryResource() which allocates -# half the available memory. This doesn't play well with multiple workers, so -# we keep --numprocesses=1 for now. This should be resolved by -# https://github.com/rapidsai/cudf/issues/16723. rapids-logger "pytest cudf-polars" ./ci/run_cudf_polars_pytests.sh \ --junitxml="${RAPIDS_TESTS_DIR}/junit-cudf-polars.xml" \ - --numprocesses=1 \ + --numprocesses=6 \ --dist=worksteal \ --cov-config=./pyproject.toml \ --cov=cudf_polars \ diff --git a/python/cudf_polars/tests/conftest.py b/python/cudf_polars/tests/conftest.py index 3537a66d938..2b445a0a85c 100644 --- a/python/cudf_polars/tests/conftest.py +++ b/python/cudf_polars/tests/conftest.py @@ -2,11 +2,9 @@ # SPDX-License-Identifier: Apache-2.0 from __future__ import annotations -import os - import pytest -DISTRIBUTED_CLUSTER_KEY = pytest.StashKey[dict]() +import cudf_polars.callback @pytest.fixture(params=[False, True], ids=["no_nulls", "nulls"], scope="session") @@ -14,6 +12,20 @@ def with_nulls(request): return request.param +@pytest.fixture +def clear_memory_resource_cache(): + """ + Clear the cudf_polars.callback.default_memory_resource cache before and after a test. + + This function caches memory resources for the duration of the process. Any test that + creates a pool (e.g. ``CudaAsyncMemoryResource``) should use this fixture to ensure that + the pool is freed after the test. + """ + cudf_polars.callback.default_memory_resource.cache_clear() + yield + cudf_polars.callback.default_memory_resource.cache_clear() + + def pytest_addoption(parser): parser.addoption( "--executor", @@ -57,33 +69,3 @@ def pytest_configure(config): cudf_polars.testing.asserts.DEFAULT_BLOCKSIZE_MODE = config.getoption( "--blocksize-mode" ) - - -def pytest_sessionstart(session): - if ( - session.config.getoption("--scheduler") == "distributed" - and session.config.getoption("--executor") == "streaming" - ): - from dask import config - from dask.distributed import Client - from dask_cuda import LocalCUDACluster - - # Avoid "Sending large graph of size ..." warnings - # (We expect these for tests using literal/random arrays) - config.set({"distributed.admin.large-graph-warning-threshold": "20MB"}) - - n_workers = int(os.environ.get("CUDF_POLARS_NUM_WORKERS", "1")) - cluster = LocalCUDACluster(n_workers=n_workers) - client = Client(cluster) - session.stash[DISTRIBUTED_CLUSTER_KEY] = {"cluster": cluster, "client": client} - - -def pytest_sessionfinish(session): - if DISTRIBUTED_CLUSTER_KEY in session.stash: - cluster_info = session.stash[DISTRIBUTED_CLUSTER_KEY] - client = cluster_info.get("client") - cluster = cluster_info.get("cluster") - if client is not None: - client.shutdown() - if cluster is not None: - cluster.close() diff --git a/python/cudf_polars/tests/experimental/conftest.py b/python/cudf_polars/tests/experimental/conftest.py new file mode 100644 index 00000000000..c01c82a4a95 --- /dev/null +++ b/python/cudf_polars/tests/experimental/conftest.py @@ -0,0 +1,44 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. +# SPDX-License-Identifier: Apache-2.0 +from __future__ import annotations + +import os + +import pytest + + +@pytest.fixture(autouse=True) +def dask_cluster(pytestconfig, worker_id): + if ( + pytestconfig.getoption("--scheduler") == "distributed" + and pytestconfig.getoption("--executor") == "streaming" + ): + worker_count = int(os.environ.get("PYTEST_XDIST_WORKER_COUNT", "0")) + from dask import config + from dask_cuda import LocalCUDACluster + + # Avoid "Sending large graph of size ..." warnings + # (We expect these for tests using literal/random arrays) + config.set({"distributed.admin.large-graph-warning-threshold": "20MB"}) + if worker_count > 0: + # Avoid port conflicts with multiple test runners + worker_index = int(worker_id.removeprefix("gw")) + scheduler_port = 8800 + worker_index + dashboard_address = 8900 + worker_index + else: + scheduler_port = None + dashboard_address = None + + n_workers = int(os.environ.get("CUDF_POLARS_NUM_WORKERS", "1")) + + with ( + LocalCUDACluster( + n_workers=n_workers, + scheduler_port=scheduler_port, + dashboard_address=dashboard_address, + ) as cluster, + cluster.get_client(), + ): + yield + else: + yield diff --git a/python/cudf_polars/tests/test_config.py b/python/cudf_polars/tests/test_config.py index a984054fbf5..ba0f2505d79 100644 --- a/python/cudf_polars/tests/test_config.py +++ b/python/cudf_polars/tests/test_config.py @@ -101,6 +101,7 @@ def test_invalid_memory_resource_raises(mr): reason="managed memory not supported", ) @pytest.mark.parametrize("enable_managed_memory", ["1", "0"]) +@pytest.mark.usefixtures("clear_memory_resource_cache") def test_cudf_polars_enable_disable_managed_memory(monkeypatch, enable_managed_memory): q = pl.LazyFrame({"a": [1, 2, 3]})