diff --git a/changelog/1180.feature.rst b/changelog/1180.feature.rst new file mode 100644 index 00000000..1c2ae4fd --- /dev/null +++ b/changelog/1180.feature.rst @@ -0,0 +1 @@ +Added new 'singlecollect' distribution mode that only collects tests once on a single worker and skips collection verification on other workers. This can significantly improve startup time for test suites with expensive collection. \ No newline at end of file diff --git a/src/xdist/dsession.py b/src/xdist/dsession.py index 5bf7d980..2e53bc99 100644 --- a/src/xdist/dsession.py +++ b/src/xdist/dsession.py @@ -20,6 +20,7 @@ from xdist.scheduler import LoadScheduling from xdist.scheduler import LoadScopeScheduling from xdist.scheduler import Scheduling +from xdist.scheduler import SingleCollectScheduling from xdist.scheduler import WorkStealingScheduling from xdist.workermanage import NodeManager from xdist.workermanage import WorkerController @@ -123,6 +124,8 @@ def pytest_xdist_make_scheduler( return LoadGroupScheduling(config, log) if dist == "worksteal": return WorkStealingScheduling(config, log) + if dist == "singlecollect": + return SingleCollectScheduling(config, log) return None @pytest.hookimpl diff --git a/src/xdist/plugin.py b/src/xdist/plugin.py index 0cf90f86..624fe9f1 100644 --- a/src/xdist/plugin.py +++ b/src/xdist/plugin.py @@ -108,6 +108,7 @@ def pytest_addoption(parser: pytest.Parser) -> None: "loadfile", "loadgroup", "worksteal", + "singlecollect", "no", ], dest="dist", @@ -124,6 +125,7 @@ def pytest_addoption(parser: pytest.Parser) -> None: "loadgroup: Like 'load', but sends tests marked with 'xdist_group' to the same worker.\n\n" "worksteal: Split the test suite between available environments," " then re-balance when any worker runs out of tests.\n\n" + "singlecollect: Only collect tests once on a single worker and skip collection verification.\n\n" "(default) no: Run tests inprocess, don't distribute." ), ) diff --git a/src/xdist/scheduler/__init__.py b/src/xdist/scheduler/__init__.py index b4894732..e531f91e 100644 --- a/src/xdist/scheduler/__init__.py +++ b/src/xdist/scheduler/__init__.py @@ -4,4 +4,7 @@ from xdist.scheduler.loadgroup import LoadGroupScheduling as LoadGroupScheduling from xdist.scheduler.loadscope import LoadScopeScheduling as LoadScopeScheduling from xdist.scheduler.protocol import Scheduling as Scheduling +from xdist.scheduler.singlecollect import ( + SingleCollectScheduling as SingleCollectScheduling, +) from xdist.scheduler.worksteal import WorkStealingScheduling as WorkStealingScheduling diff --git a/src/xdist/scheduler/singlecollect.py b/src/xdist/scheduler/singlecollect.py new file mode 100644 index 00000000..40626d3d --- /dev/null +++ b/src/xdist/scheduler/singlecollect.py @@ -0,0 +1,213 @@ +from __future__ import annotations + +from collections.abc import Sequence +from itertools import cycle + +import pytest + +from xdist.remote import Producer +from xdist.workermanage import parse_tx_spec_config +from xdist.workermanage import WorkerController + + +class SingleCollectScheduling: + """Implement scheduling with a single test collection phase. + + This differs from LoadScheduling by: + 1. Only collecting tests on the first node + 2. Skipping collection on other nodes + 3. Not checking for collection equality + + This can significantly improve startup time by avoiding redundant collection + and collection verification across multiple worker processes. + """ + + def __init__(self, config: pytest.Config, log: Producer | None = None) -> None: + self.numnodes = len(parse_tx_spec_config(config)) + self.node2pending: dict[WorkerController, list[int]] = {} + self.pending: list[int] = [] + self.collection: list[str] | None = None + self.first_node: WorkerController | None = None + if log is None: + self.log = Producer("singlecollect") + else: + self.log = log.singlecollect + self.config = config + self.maxschedchunk = self.config.getoption("maxschedchunk") + self.collection_done = False + + @property + def nodes(self) -> list[WorkerController]: + """A list of all nodes in the scheduler.""" + return list(self.node2pending.keys()) + + @property + def collection_is_completed(self) -> bool: + """Return True once we have collected tests from the first node.""" + return self.collection_done + + @property + def tests_finished(self) -> bool: + """Return True if all tests have been executed by the nodes.""" + if not self.collection_is_completed: + return False + if self.pending: + return False + for pending in self.node2pending.values(): + if len(pending) >= 2: + return False + return True + + @property + def has_pending(self) -> bool: + """Return True if there are pending test items.""" + if self.pending: + return True + for pending in self.node2pending.values(): + if pending: + return True + return False + + def add_node(self, node: WorkerController) -> None: + """Add a new node to the scheduler.""" + assert node not in self.node2pending + self.node2pending[node] = [] + + # Remember the first node as our collector + if self.first_node is None: + self.first_node = node + self.log(f"Using {node.gateway.id} as collection node") + + def add_node_collection( + self, node: WorkerController, collection: Sequence[str] + ) -> None: + """Only use collection from the first node.""" + # We only care about collection from the first node + if node == self.first_node: + self.log(f"Received collection from first node {node.gateway.id}") + self.collection = list(collection) + self.collection_done = True + else: + # Skip collection verification for other nodes + self.log(f"Ignoring collection from node {node.gateway.id}") + + def mark_test_complete( + self, node: WorkerController, item_index: int | str, duration: float = 0 + ) -> None: + """Mark test item as completed by node.""" + self.node2pending[node].remove( + int(item_index) if isinstance(item_index, str) else item_index + ) + self.check_schedule(node, duration=duration) + + def mark_test_pending(self, item: str) -> None: + assert self.collection is not None + self.pending.insert( + 0, + self.collection.index(item), + ) + for node in self.node2pending: + self.check_schedule(node) + + def remove_pending_tests_from_node( + self, + node: WorkerController, + indices: Sequence[int], + ) -> None: + raise NotImplementedError() + + def check_schedule(self, node: WorkerController, duration: float = 0) -> None: + """Maybe schedule new items on the node.""" + if node.shutting_down: + return + + if self.pending: + # how many nodes do we have? + num_nodes = len(self.node2pending) + # if our node goes below a heuristic minimum, fill it out to + # heuristic maximum + items_per_node_min = max(2, len(self.pending) // num_nodes // 4) + items_per_node_max = max(2, len(self.pending) // num_nodes // 2) + node_pending = self.node2pending[node] + if len(node_pending) < items_per_node_min: + if duration >= 0.1 and len(node_pending) >= 2: + # seems the node is doing long-running tests + # and has enough items to continue + # so let's rather wait with sending new items + return + num_send = items_per_node_max - len(node_pending) + # keep at least 2 tests pending even if --maxschedchunk=1 + maxschedchunk = max(2 - len(node_pending), self.maxschedchunk) + self._send_tests(node, min(num_send, maxschedchunk)) + else: + node.shutdown() + + self.log("num items waiting for node:", len(self.pending)) + + def remove_node(self, node: WorkerController) -> str | None: + """Remove a node from the scheduler.""" + pending = self.node2pending.pop(node) + + # If this is the first node (collector), reset it + if node == self.first_node: + self.first_node = None + + if not pending: + return None + + # Reassign pending items if the node had any + assert self.collection is not None + crashitem = self.collection[pending.pop(0)] + self.pending.extend(pending) + for node in self.node2pending: + self.check_schedule(node) + return crashitem + + def schedule(self) -> None: + """Initiate distribution of the test collection.""" + assert self.collection_is_completed + + # Initial distribution already happened, reschedule on all nodes + if self.pending: + for node in self.nodes: + self.check_schedule(node) + return + + # Initialize the index of pending items + assert self.collection is not None + self.pending[:] = range(len(self.collection)) + if not self.collection: + return + + if self.maxschedchunk is None: + self.maxschedchunk = len(self.collection) + + # Send a batch of tests to run. If we don't have at least two + # tests per node, we have to send them all so that we can send + # shutdown signals and get all nodes working. + if len(self.pending) < 2 * len(self.nodes): + # Distribute tests round-robin + nodes = cycle(self.nodes) + for _ in range(len(self.pending)): + self._send_tests(next(nodes), 1) + else: + # how many items per node do we have about? + items_per_node = len(self.collection) // len(self.node2pending) + # take a fraction of tests for initial distribution + node_chunksize = min(items_per_node // 4, self.maxschedchunk) + node_chunksize = max(node_chunksize, 2) + # and initialize each node with a chunk of tests + for node in self.nodes: + self._send_tests(node, node_chunksize) + + if not self.pending: + # initial distribution sent all tests, start node shutdown + for node in self.nodes: + node.shutdown() + + def _send_tests(self, node: WorkerController, num: int) -> None: + tests_per_node = self.pending[:num] + if tests_per_node: + del self.pending[:num] + self.node2pending[node].extend(tests_per_node) + node.send_runtest_some(tests_per_node) diff --git a/testing/acceptance_test.py b/testing/acceptance_test.py index 3ef10cc9..a717b68c 100644 --- a/testing/acceptance_test.py +++ b/testing/acceptance_test.py @@ -10,6 +10,69 @@ import xdist +class TestSingleCollectScheduling: + def test_singlecollect_mode(self, pytester: pytest.Pytester) -> None: + """Test that the singlecollect distribution mode works.""" + # Create a simple test file + p1 = pytester.makepyfile( + """ + def test_ok(): + pass + """ + ) + result = pytester.runpytest(p1, "-n2", "--dist=singlecollect", "-v") + assert result.ret == 0 + result.stdout.fnmatch_lines(["*1 passed*"]) + # Make sure the tests are correctly distributed + result.stdout.fnmatch_lines(["*scheduling tests via SingleCollectScheduling*"]) + + def test_singlecollect_many_tests(self, pytester: pytest.Pytester) -> None: + """Test that the singlecollect mode correctly distributes many tests.""" + # Create test file with multiple tests + p1 = pytester.makepyfile( + """ + import pytest + @pytest.mark.parametrize("x", range(10)) + def test_ok(x): + assert True + """ + ) + result = pytester.runpytest(p1, "-n2", "--dist=singlecollect", "-v") + assert result.ret == 0 + result.stdout.fnmatch_lines(["*passed*"]) + # Make sure the tests are correctly distributed + result.stdout.fnmatch_lines(["*scheduling tests via SingleCollectScheduling*"]) + + def test_singlecollect_failure(self, pytester: pytest.Pytester) -> None: + """Test that failures are correctly reported with singlecollect mode.""" + p1 = pytester.makepyfile( + """ + def test_fail(): + assert 0 + """ + ) + result = pytester.runpytest(p1, "-n2", "--dist=singlecollect", "-v") + assert result.ret == 1 + result.stdout.fnmatch_lines(["*1 failed*"]) + + def test_singlecollect_handles_fixtures(self, pytester: pytest.Pytester) -> None: + """Test that fixtures work correctly with singlecollect mode.""" + pytester.makepyfile( + """ + import pytest + + @pytest.fixture + def my_fixture(): + return 42 + def test_with_fixture(my_fixture): + assert my_fixture == 42 + """ + ) + result = pytester.runpytest("-n2", "--dist=singlecollect", "-v") + assert result.ret == 0 + result.stdout.fnmatch_lines(["*1 passed*"]) + + class TestDistribution: def test_n1_pass(self, pytester: pytest.Pytester) -> None: p1 = pytester.makepyfile( diff --git a/testing/test_dsession.py b/testing/test_dsession.py index 680b7ae0..a1020d0b 100644 --- a/testing/test_dsession.py +++ b/testing/test_dsession.py @@ -15,6 +15,7 @@ from xdist.report import report_collection_diff from xdist.scheduler import EachScheduling from xdist.scheduler import LoadScheduling +from xdist.scheduler import SingleCollectScheduling from xdist.scheduler import WorkStealingScheduling from xdist.workermanage import WorkerController @@ -467,6 +468,137 @@ def pytest_collectreport(self, report: pytest.CollectReport) -> None: assert "Different tests were collected between" in rep.longrepr +class TestSingleCollectScheduling: + def test_initialization(self, pytester: pytest.Pytester) -> None: + config = pytester.parseconfig("--tx=2*popen") + sched = SingleCollectScheduling(config) + assert sched.first_node is None + assert sched.collection is None + assert not sched.collection_done + assert not sched.collection_is_completed + assert len(sched.node2pending) == 0 + + def test_add_node_and_collection(self, pytester: pytest.Pytester) -> None: + config = pytester.parseconfig("--tx=2*popen") + sched = SingleCollectScheduling(config) + node1, node2 = MockNode(), MockNode() + + # First node should be set as collector + sched.add_node(node1) + assert sched.first_node == node1 + + # Second node should not become the collector + sched.add_node(node2) + assert sched.first_node == node1 + + # Collection from first node should be used + collection = ["a.py::test_1", "a.py::test_2", "a.py::test_3"] + sched.add_node_collection(node1, collection) + assert sched.collection == collection + assert sched.collection_done + assert sched.collection_is_completed + + # Collection from second node should be ignored + different_collection = ["a.py::test_1", "a.py::test_4"] + sched.add_node_collection(node2, different_collection) + # Collection should not change - still using the first node's collection + assert sched.collection == collection + + def test_schedule_tests(self, pytester: pytest.Pytester) -> None: + config = pytester.parseconfig("--tx=2*popen") + sched = SingleCollectScheduling(config) + node1, node2 = MockNode(), MockNode() + sched.add_node(node1) + sched.add_node(node2) + + collection = ["a.py::test_1", "a.py::test_2", "a.py::test_3", "a.py::test_4"] + sched.add_node_collection(node1, collection) + + # Should use collection from node1 and distribute tests + sched.schedule() + + # Check that tests were distributed across both nodes + assert len(node1.sent) > 0 + assert len(node2.sent) > 0 + + # Tests should be distributed completely + all_tests = node1.sent + node2.sent + assert sorted(all_tests) == list(range(len(collection))) + + # The pending list should be empty after distribution + assert not sched.pending + + def test_handle_node_failure(self, pytester: pytest.Pytester) -> None: + config = pytester.parseconfig("--tx=2*popen") + sched = SingleCollectScheduling(config) + node1, node2 = MockNode(), MockNode() + sched.add_node(node1) + sched.add_node(node2) + + collection = ["a.py::test_1", "a.py::test_2", "a.py::test_3", "a.py::test_4"] + sched.add_node_collection(node1, collection) + sched.schedule() + + # Simulate node1 completing a test + test_idx = node1.sent[0] + sched.mark_test_complete(node1, test_idx) + + # Now remove node2 (simulating failure) + sched.remove_node(node2) + + # Tests assigned to node2 should go back to pending + assert len(sched.pending) > 0 + + # Add a new node + node3 = MockNode() + sched.add_node(node3) + + # Since collection is already completed, schedule should assign pending tests to node3 + sched.schedule() + assert len(node3.sent) > 0 + + # Complete all tests + for idx in node1.sent: + if idx != test_idx: # Skip the one we already completed + sched.mark_test_complete(node1, idx) + + for idx in node3.sent: + sched.mark_test_complete(node3, idx) + + # All tests should be completed + assert sched.tests_finished + + def test_first_node_failure(self, pytester: pytest.Pytester) -> None: + """Test what happens when the collecting node fails before collection is done.""" + config = pytester.parseconfig("--tx=2*popen") + sched = SingleCollectScheduling(config) + node1, node2 = MockNode(), MockNode() + + # First node should be collector + sched.add_node(node1) + assert sched.first_node == node1 + + # Add second node + sched.add_node(node2) + + # First node fails before collection + sched.remove_node(node1) + + # Now first_node should be reset after removal + # Add a new node, it should become the collector + node3 = MockNode() + sched.add_node(node3) + # Verify the new node became the collector + assert sched.first_node == node3 + + # Complete collection with node3 + collection = ["a.py::test_1", "a.py::test_2"] + sched.add_node_collection(node3, collection) + assert sched.collection == collection + assert sched.collection_done + assert sched.collection_is_completed + + class TestDistReporter: @pytest.mark.xfail def test_rsync_printing(self, pytester: pytest.Pytester, linecomp: Any) -> None: