From badd08358f6837138a359b8e5280e93aefc8ba3a Mon Sep 17 00:00:00 2001 From: Zori Date: Tue, 4 Mar 2025 00:13:30 +0400 Subject: [PATCH 1/5] Add 'singlecollect' distribution mode This adds a new 'singlecollect' distribution mode that only collects tests on the first worker node and skips redundant collection on other nodes. This can significantly improve startup time for large test suites with expensive collection. Key features: - Only the first worker performs test collection - Other workers skip collection verification entirely - Tests are distributed using the same algorithm as 'load' mode - Handles worker failures gracefully, including the collecting worker - Solves issues with floating parameters in pytest collection --- changelog/1180.feature.rst | 1 + src/xdist/dsession.py | 3 + src/xdist/plugin.py | 2 + src/xdist/scheduler/__init__.py | 1 + src/xdist/scheduler/singlecollect.py | 211 +++++++++++++++++++++++++++ testing/acceptance_test.py | 64 ++++++++ testing/test_dsession.py | 133 +++++++++++++++++ 7 files changed, 415 insertions(+) create mode 100644 changelog/1180.feature.rst create mode 100644 src/xdist/scheduler/singlecollect.py diff --git a/changelog/1180.feature.rst b/changelog/1180.feature.rst new file mode 100644 index 00000000..1c2ae4fd --- /dev/null +++ b/changelog/1180.feature.rst @@ -0,0 +1 @@ +Added new 'singlecollect' distribution mode that only collects tests once on a single worker and skips collection verification on other workers. This can significantly improve startup time for test suites with expensive collection. \ No newline at end of file diff --git a/src/xdist/dsession.py b/src/xdist/dsession.py index 5bf7d980..2e53bc99 100644 --- a/src/xdist/dsession.py +++ b/src/xdist/dsession.py @@ -20,6 +20,7 @@ from xdist.scheduler import LoadScheduling from xdist.scheduler import LoadScopeScheduling from xdist.scheduler import Scheduling +from xdist.scheduler import SingleCollectScheduling from xdist.scheduler import WorkStealingScheduling from xdist.workermanage import NodeManager from xdist.workermanage import WorkerController @@ -123,6 +124,8 @@ def pytest_xdist_make_scheduler( return LoadGroupScheduling(config, log) if dist == "worksteal": return WorkStealingScheduling(config, log) + if dist == "singlecollect": + return SingleCollectScheduling(config, log) return None @pytest.hookimpl diff --git a/src/xdist/plugin.py b/src/xdist/plugin.py index 0cf90f86..624fe9f1 100644 --- a/src/xdist/plugin.py +++ b/src/xdist/plugin.py @@ -108,6 +108,7 @@ def pytest_addoption(parser: pytest.Parser) -> None: "loadfile", "loadgroup", "worksteal", + "singlecollect", "no", ], dest="dist", @@ -124,6 +125,7 @@ def pytest_addoption(parser: pytest.Parser) -> None: "loadgroup: Like 'load', but sends tests marked with 'xdist_group' to the same worker.\n\n" "worksteal: Split the test suite between available environments," " then re-balance when any worker runs out of tests.\n\n" + "singlecollect: Only collect tests once on a single worker and skip collection verification.\n\n" "(default) no: Run tests inprocess, don't distribute." ), ) diff --git a/src/xdist/scheduler/__init__.py b/src/xdist/scheduler/__init__.py index b4894732..7bbcbc77 100644 --- a/src/xdist/scheduler/__init__.py +++ b/src/xdist/scheduler/__init__.py @@ -4,4 +4,5 @@ from xdist.scheduler.loadgroup import LoadGroupScheduling as LoadGroupScheduling from xdist.scheduler.loadscope import LoadScopeScheduling as LoadScopeScheduling from xdist.scheduler.protocol import Scheduling as Scheduling +from xdist.scheduler.singlecollect import SingleCollectScheduling as SingleCollectScheduling from xdist.scheduler.worksteal import WorkStealingScheduling as WorkStealingScheduling diff --git a/src/xdist/scheduler/singlecollect.py b/src/xdist/scheduler/singlecollect.py new file mode 100644 index 00000000..eace7535 --- /dev/null +++ b/src/xdist/scheduler/singlecollect.py @@ -0,0 +1,211 @@ +from __future__ import annotations + +from collections.abc import Sequence +from itertools import cycle + +import pytest + +from xdist.remote import Producer +from xdist.workermanage import parse_tx_spec_config +from xdist.workermanage import WorkerController + + +class SingleCollectScheduling: + """Implement scheduling with a single test collection phase. + + This differs from LoadScheduling by: + 1. Only collecting tests on the first node + 2. Skipping collection on other nodes + 3. Not checking for collection equality + + This can significantly improve startup time by avoiding redundant collection + and collection verification across multiple worker processes. + """ + + def __init__(self, config: pytest.Config, log: Producer | None = None) -> None: + self.numnodes = len(parse_tx_spec_config(config)) + self.node2pending: dict[WorkerController, list[int]] = {} + self.pending: list[int] = [] + self.collection: list[str] | None = None + self.first_node: WorkerController | None = None + if log is None: + self.log = Producer("singlecollect") + else: + self.log = log.singlecollect + self.config = config + self.maxschedchunk = self.config.getoption("maxschedchunk") + self.collection_done = False + + @property + def nodes(self) -> list[WorkerController]: + """A list of all nodes in the scheduler.""" + return list(self.node2pending.keys()) + + @property + def collection_is_completed(self) -> bool: + """Return True once we have collected tests from the first node.""" + return self.collection_done + + @property + def tests_finished(self) -> bool: + """Return True if all tests have been executed by the nodes.""" + if not self.collection_is_completed: + return False + if self.pending: + return False + for pending in self.node2pending.values(): + if len(pending) >= 2: + return False + return True + + @property + def has_pending(self) -> bool: + """Return True if there are pending test items.""" + if self.pending: + return True + for pending in self.node2pending.values(): + if pending: + return True + return False + + def add_node(self, node: WorkerController) -> None: + """Add a new node to the scheduler.""" + assert node not in self.node2pending + self.node2pending[node] = [] + + # Remember the first node as our collector + if self.first_node is None: + self.first_node = node + self.log(f"Using {node.gateway.id} as collection node") + + def add_node_collection( + self, node: WorkerController, collection: Sequence[str] + ) -> None: + """Only use collection from the first node.""" + # We only care about collection from the first node + if node == self.first_node: + self.log(f"Received collection from first node {node.gateway.id}") + self.collection = list(collection) + self.collection_done = True + else: + # Skip collection verification for other nodes + self.log(f"Ignoring collection from node {node.gateway.id}") + + def mark_test_complete( + self, node: WorkerController, item_index: int, duration: float = 0 + ) -> None: + """Mark test item as completed by node.""" + self.node2pending[node].remove(item_index) + self.check_schedule(node, duration=duration) + + def mark_test_pending(self, item: str) -> None: + assert self.collection is not None + self.pending.insert( + 0, + self.collection.index(item), + ) + for node in self.node2pending: + self.check_schedule(node) + + def remove_pending_tests_from_node( + self, + node: WorkerController, + indices: Sequence[int], + ) -> None: + raise NotImplementedError() + + def check_schedule(self, node: WorkerController, duration: float = 0) -> None: + """Maybe schedule new items on the node.""" + if node.shutting_down: + return + + if self.pending: + # how many nodes do we have? + num_nodes = len(self.node2pending) + # if our node goes below a heuristic minimum, fill it out to + # heuristic maximum + items_per_node_min = max(2, len(self.pending) // num_nodes // 4) + items_per_node_max = max(2, len(self.pending) // num_nodes // 2) + node_pending = self.node2pending[node] + if len(node_pending) < items_per_node_min: + if duration >= 0.1 and len(node_pending) >= 2: + # seems the node is doing long-running tests + # and has enough items to continue + # so let's rather wait with sending new items + return + num_send = items_per_node_max - len(node_pending) + # keep at least 2 tests pending even if --maxschedchunk=1 + maxschedchunk = max(2 - len(node_pending), self.maxschedchunk) + self._send_tests(node, min(num_send, maxschedchunk)) + else: + node.shutdown() + + self.log("num items waiting for node:", len(self.pending)) + + def remove_node(self, node: WorkerController) -> str | None: + """Remove a node from the scheduler.""" + pending = self.node2pending.pop(node) + + # If this is the first node (collector), reset it + if node == self.first_node: + self.first_node = None + + if not pending: + return None + + # Reassign pending items if the node had any + assert self.collection is not None + crashitem = self.collection[pending.pop(0)] + self.pending.extend(pending) + for node in self.node2pending: + self.check_schedule(node) + return crashitem + + def schedule(self) -> None: + """Initiate distribution of the test collection.""" + assert self.collection_is_completed + + # Initial distribution already happened, reschedule on all nodes + if self.pending: + for node in self.nodes: + self.check_schedule(node) + return + + # Initialize the index of pending items + assert self.collection is not None + self.pending[:] = range(len(self.collection)) + if not self.collection: + return + + if self.maxschedchunk is None: + self.maxschedchunk = len(self.collection) + + # Send a batch of tests to run. If we don't have at least two + # tests per node, we have to send them all so that we can send + # shutdown signals and get all nodes working. + if len(self.pending) < 2 * len(self.nodes): + # Distribute tests round-robin + nodes = cycle(self.nodes) + for _ in range(len(self.pending)): + self._send_tests(next(nodes), 1) + else: + # how many items per node do we have about? + items_per_node = len(self.collection) // len(self.node2pending) + # take a fraction of tests for initial distribution + node_chunksize = min(items_per_node // 4, self.maxschedchunk) + node_chunksize = max(node_chunksize, 2) + # and initialize each node with a chunk of tests + for node in self.nodes: + self._send_tests(node, node_chunksize) + + if not self.pending: + # initial distribution sent all tests, start node shutdown + for node in self.nodes: + node.shutdown() + + def _send_tests(self, node: WorkerController, num: int) -> None: + tests_per_node = self.pending[:num] + if tests_per_node: + del self.pending[:num] + self.node2pending[node].extend(tests_per_node) + node.send_runtest_some(tests_per_node) \ No newline at end of file diff --git a/testing/acceptance_test.py b/testing/acceptance_test.py index 3ef10cc9..6924edcd 100644 --- a/testing/acceptance_test.py +++ b/testing/acceptance_test.py @@ -10,6 +10,70 @@ import xdist +class TestSingleCollectScheduling: + def test_singlecollect_mode(self, pytester: pytest.Pytester) -> None: + """Test that the singlecollect distribution mode works.""" + # Create a simple test file + p1 = pytester.makepyfile( + """ + def test_ok(): + pass + """ + ) + result = pytester.runpytest(p1, "-n2", "--dist=singlecollect", "-v") + assert result.ret == 0 + result.stdout.fnmatch_lines(["*1 passed*"]) + # Make sure the tests are correctly distributed + result.stdout.fnmatch_lines(["*scheduling tests via SingleCollectScheduling*"]) + + def test_singlecollect_many_tests(self, pytester: pytest.Pytester) -> None: + """Test that the singlecollect mode correctly distributes many tests.""" + # Create test file with multiple tests + p1 = pytester.makepyfile( + """ + import pytest + @pytest.mark.parametrize("x", range(10)) + def test_ok(x): + assert True + """ + ) + result = pytester.runpytest(p1, "-n2", "--dist=singlecollect", "-v") + assert result.ret == 0 + result.stdout.fnmatch_lines(["*passed*"]) + # Make sure the tests are correctly distributed + result.stdout.fnmatch_lines(["*scheduling tests via SingleCollectScheduling*"]) + + def test_singlecollect_failure(self, pytester: pytest.Pytester) -> None: + """Test that failures are correctly reported with singlecollect mode.""" + p1 = pytester.makepyfile( + """ + def test_fail(): + assert 0 + """ + ) + result = pytester.runpytest(p1, "-n2", "--dist=singlecollect", "-v") + assert result.ret == 1 + result.stdout.fnmatch_lines(["*1 failed*"]) + + def test_singlecollect_handles_fixtures(self, pytester: pytest.Pytester) -> None: + """Test that fixtures work correctly with singlecollect mode.""" + pytester.makepyfile( + """ + import pytest + + @pytest.fixture + def my_fixture(): + return 42 + + def test_with_fixture(my_fixture): + assert my_fixture == 42 + """ + ) + result = pytester.runpytest("-n2", "--dist=singlecollect", "-v") + assert result.ret == 0 + result.stdout.fnmatch_lines(["*1 passed*"]) + + class TestDistribution: def test_n1_pass(self, pytester: pytest.Pytester) -> None: p1 = pytester.makepyfile( diff --git a/testing/test_dsession.py b/testing/test_dsession.py index 680b7ae0..07483f20 100644 --- a/testing/test_dsession.py +++ b/testing/test_dsession.py @@ -15,6 +15,7 @@ from xdist.report import report_collection_diff from xdist.scheduler import EachScheduling from xdist.scheduler import LoadScheduling +from xdist.scheduler import SingleCollectScheduling from xdist.scheduler import WorkStealingScheduling from xdist.workermanage import WorkerController @@ -467,6 +468,138 @@ def pytest_collectreport(self, report: pytest.CollectReport) -> None: assert "Different tests were collected between" in rep.longrepr +class TestSingleCollectScheduling: + def test_initialization(self, pytester: pytest.Pytester) -> None: + config = pytester.parseconfig("--tx=2*popen") + sched = SingleCollectScheduling(config) + assert sched.first_node is None + assert sched.collection is None + assert not sched.collection_done + assert not sched.collection_is_completed + assert len(sched.node2pending) == 0 + + def test_add_node_and_collection(self, pytester: pytest.Pytester) -> None: + config = pytester.parseconfig("--tx=2*popen") + sched = SingleCollectScheduling(config) + node1, node2 = MockNode(), MockNode() + + # First node should be set as collector + sched.add_node(node1) + assert sched.first_node == node1 + + # Second node should not become the collector + sched.add_node(node2) + assert sched.first_node == node1 + + # Collection from first node should be used + collection = ["a.py::test_1", "a.py::test_2", "a.py::test_3"] + sched.add_node_collection(node1, collection) + assert sched.collection == collection + assert sched.collection_done + assert sched.collection_is_completed + + # Collection from second node should be ignored + different_collection = ["a.py::test_1", "a.py::test_4"] + sched.add_node_collection(node2, different_collection) + # Collection should not change - still using the first node's collection + assert sched.collection == collection + + def test_schedule_tests(self, pytester: pytest.Pytester) -> None: + config = pytester.parseconfig("--tx=2*popen") + sched = SingleCollectScheduling(config) + node1, node2 = MockNode(), MockNode() + sched.add_node(node1) + sched.add_node(node2) + + collection = ["a.py::test_1", "a.py::test_2", "a.py::test_3", "a.py::test_4"] + sched.add_node_collection(node1, collection) + + # Should use collection from node1 and distribute tests + sched.schedule() + + # Check that tests were distributed across both nodes + assert len(node1.sent) > 0 + assert len(node2.sent) > 0 + + # Tests should be distributed completely + all_tests = node1.sent + node2.sent + assert sorted(all_tests) == list(range(len(collection))) + + # The pending list should be empty after distribution + assert not sched.pending + + def test_handle_node_failure(self, pytester: pytest.Pytester) -> None: + config = pytester.parseconfig("--tx=2*popen") + sched = SingleCollectScheduling(config) + node1, node2 = MockNode(), MockNode() + sched.add_node(node1) + sched.add_node(node2) + + collection = ["a.py::test_1", "a.py::test_2", "a.py::test_3", "a.py::test_4"] + sched.add_node_collection(node1, collection) + sched.schedule() + + # Simulate node1 completing a test + test_idx = node1.sent[0] + sched.mark_test_complete(node1, test_idx) + + # Now remove node2 (simulating failure) + crashitem = sched.remove_node(node2) + + # Tests assigned to node2 should go back to pending + assert len(sched.pending) > 0 + + # Add a new node + node3 = MockNode() + sched.add_node(node3) + + # Since collection is already completed, schedule should assign pending tests to node3 + sched.schedule() + assert len(node3.sent) > 0 + + # Complete all tests + for idx in node1.sent: + if idx != test_idx: # Skip the one we already completed + sched.mark_test_complete(node1, idx) + + for idx in node3.sent: + sched.mark_test_complete(node3, idx) + + # All tests should be completed + assert sched.tests_finished + + def test_first_node_failure(self, pytester: pytest.Pytester) -> None: + """Test what happens when the collecting node fails before collection is done.""" + config = pytester.parseconfig("--tx=2*popen") + sched = SingleCollectScheduling(config) + node1, node2 = MockNode(), MockNode() + + # First node should be collector + sched.add_node(node1) + assert sched.first_node == node1 + + # Add second node + sched.add_node(node2) + + # First node fails before collection + sched.remove_node(node1) + + # Now second node should become the collector + assert sched.first_node is None # first_node is reset after removal + + # Add a new node, it should become the collector + node3 = MockNode() + sched.add_node(node3) + assert sched.first_node == node3 + + # Complete collection with node3 + collection = ["a.py::test_1", "a.py::test_2"] + sched.add_node_collection(node3, collection) + assert sched.collection == collection + assert sched.collection_done + assert sched.collection_is_completed + + class TestDistReporter: @pytest.mark.xfail def test_rsync_printing(self, pytester: pytest.Pytester, linecomp: Any) -> None: From 58083487578558b83f0337b8da301ba2f1a73f7e Mon Sep 17 00:00:00 2001 From: zorhay Date: Tue, 4 Mar 2025 01:00:23 +0400 Subject: [PATCH 2/5] [FIX] Fix mypy errors. --- src/xdist/scheduler/singlecollect.py | 14 +++++++------- testing/acceptance_test.py | 3 +-- testing/test_dsession.py | 3 ++- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/xdist/scheduler/singlecollect.py b/src/xdist/scheduler/singlecollect.py index eace7535..ebea266e 100644 --- a/src/xdist/scheduler/singlecollect.py +++ b/src/xdist/scheduler/singlecollect.py @@ -12,12 +12,12 @@ class SingleCollectScheduling: """Implement scheduling with a single test collection phase. - + This differs from LoadScheduling by: 1. Only collecting tests on the first node 2. Skipping collection on other nodes 3. Not checking for collection equality - + This can significantly improve startup time by avoiding redundant collection and collection verification across multiple worker processes. """ @@ -72,7 +72,7 @@ def add_node(self, node: WorkerController) -> None: """Add a new node to the scheduler.""" assert node not in self.node2pending self.node2pending[node] = [] - + # Remember the first node as our collector if self.first_node is None: self.first_node = node @@ -92,10 +92,10 @@ def add_node_collection( self.log(f"Ignoring collection from node {node.gateway.id}") def mark_test_complete( - self, node: WorkerController, item_index: int, duration: float = 0 + self, node: WorkerController, item_index: int | str, duration: float = 0 ) -> None: """Mark test item as completed by node.""" - self.node2pending[node].remove(item_index) + self.node2pending[node].remove(int(item_index) if isinstance(item_index, str) else item_index) self.check_schedule(node, duration=duration) def mark_test_pending(self, item: str) -> None: @@ -145,11 +145,11 @@ def check_schedule(self, node: WorkerController, duration: float = 0) -> None: def remove_node(self, node: WorkerController) -> str | None: """Remove a node from the scheduler.""" pending = self.node2pending.pop(node) - + # If this is the first node (collector), reset it if node == self.first_node: self.first_node = None - + if not pending: return None diff --git a/testing/acceptance_test.py b/testing/acceptance_test.py index 6924edcd..4582bccd 100644 --- a/testing/acceptance_test.py +++ b/testing/acceptance_test.py @@ -60,11 +60,10 @@ def test_singlecollect_handles_fixtures(self, pytester: pytest.Pytester) -> None pytester.makepyfile( """ import pytest - + @pytest.fixture def my_fixture(): return 42 - def test_with_fixture(my_fixture): assert my_fixture == 42 """ diff --git a/testing/test_dsession.py b/testing/test_dsession.py index 07483f20..a13d6af7 100644 --- a/testing/test_dsession.py +++ b/testing/test_dsession.py @@ -544,7 +544,7 @@ def test_handle_node_failure(self, pytester: pytest.Pytester) -> None: sched.mark_test_complete(node1, test_idx) # Now remove node2 (simulating failure) - crashitem = sched.remove_node(node2) + sched.remove_node(node2) # Tests assigned to node2 should go back to pending assert len(sched.pending) > 0 @@ -590,6 +590,7 @@ def test_first_node_failure(self, pytester: pytest.Pytester) -> None: # Add a new node, it should become the collector node3 = MockNode() sched.add_node(node3) + # Verify the new node became the collector assert sched.first_node == node3 # Complete collection with node3 From 458c8c4a649027b56dac4ac27289b6e37584b285 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 3 Mar 2025 21:19:53 +0000 Subject: [PATCH 3/5] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/xdist/scheduler/__init__.py | 4 ++- src/xdist/scheduler/singlecollect.py | 6 ++-- testing/acceptance_test.py | 6 ++-- testing/test_dsession.py | 48 ++++++++++++++-------------- 4 files changed, 34 insertions(+), 30 deletions(-) diff --git a/src/xdist/scheduler/__init__.py b/src/xdist/scheduler/__init__.py index 7bbcbc77..e531f91e 100644 --- a/src/xdist/scheduler/__init__.py +++ b/src/xdist/scheduler/__init__.py @@ -4,5 +4,7 @@ from xdist.scheduler.loadgroup import LoadGroupScheduling as LoadGroupScheduling from xdist.scheduler.loadscope import LoadScopeScheduling as LoadScopeScheduling from xdist.scheduler.protocol import Scheduling as Scheduling -from xdist.scheduler.singlecollect import SingleCollectScheduling as SingleCollectScheduling +from xdist.scheduler.singlecollect import ( + SingleCollectScheduling as SingleCollectScheduling, +) from xdist.scheduler.worksteal import WorkStealingScheduling as WorkStealingScheduling diff --git a/src/xdist/scheduler/singlecollect.py b/src/xdist/scheduler/singlecollect.py index ebea266e..40626d3d 100644 --- a/src/xdist/scheduler/singlecollect.py +++ b/src/xdist/scheduler/singlecollect.py @@ -95,7 +95,9 @@ def mark_test_complete( self, node: WorkerController, item_index: int | str, duration: float = 0 ) -> None: """Mark test item as completed by node.""" - self.node2pending[node].remove(int(item_index) if isinstance(item_index, str) else item_index) + self.node2pending[node].remove( + int(item_index) if isinstance(item_index, str) else item_index + ) self.check_schedule(node, duration=duration) def mark_test_pending(self, item: str) -> None: @@ -208,4 +210,4 @@ def _send_tests(self, node: WorkerController, num: int) -> None: if tests_per_node: del self.pending[:num] self.node2pending[node].extend(tests_per_node) - node.send_runtest_some(tests_per_node) \ No newline at end of file + node.send_runtest_some(tests_per_node) diff --git a/testing/acceptance_test.py b/testing/acceptance_test.py index 4582bccd..a717b68c 100644 --- a/testing/acceptance_test.py +++ b/testing/acceptance_test.py @@ -25,7 +25,7 @@ def test_ok(): result.stdout.fnmatch_lines(["*1 passed*"]) # Make sure the tests are correctly distributed result.stdout.fnmatch_lines(["*scheduling tests via SingleCollectScheduling*"]) - + def test_singlecollect_many_tests(self, pytester: pytest.Pytester) -> None: """Test that the singlecollect mode correctly distributes many tests.""" # Create test file with multiple tests @@ -42,7 +42,7 @@ def test_ok(x): result.stdout.fnmatch_lines(["*passed*"]) # Make sure the tests are correctly distributed result.stdout.fnmatch_lines(["*scheduling tests via SingleCollectScheduling*"]) - + def test_singlecollect_failure(self, pytester: pytest.Pytester) -> None: """Test that failures are correctly reported with singlecollect mode.""" p1 = pytester.makepyfile( @@ -54,7 +54,7 @@ def test_fail(): result = pytester.runpytest(p1, "-n2", "--dist=singlecollect", "-v") assert result.ret == 1 result.stdout.fnmatch_lines(["*1 failed*"]) - + def test_singlecollect_handles_fixtures(self, pytester: pytest.Pytester) -> None: """Test that fixtures work correctly with singlecollect mode.""" pytester.makepyfile( diff --git a/testing/test_dsession.py b/testing/test_dsession.py index a13d6af7..f5e1aabb 100644 --- a/testing/test_dsession.py +++ b/testing/test_dsession.py @@ -482,22 +482,22 @@ def test_add_node_and_collection(self, pytester: pytest.Pytester) -> None: config = pytester.parseconfig("--tx=2*popen") sched = SingleCollectScheduling(config) node1, node2 = MockNode(), MockNode() - + # First node should be set as collector sched.add_node(node1) assert sched.first_node == node1 - + # Second node should not become the collector sched.add_node(node2) assert sched.first_node == node1 - + # Collection from first node should be used collection = ["a.py::test_1", "a.py::test_2", "a.py::test_3"] sched.add_node_collection(node1, collection) assert sched.collection == collection assert sched.collection_done assert sched.collection_is_completed - + # Collection from second node should be ignored different_collection = ["a.py::test_1", "a.py::test_4"] sched.add_node_collection(node2, different_collection) @@ -510,21 +510,21 @@ def test_schedule_tests(self, pytester: pytest.Pytester) -> None: node1, node2 = MockNode(), MockNode() sched.add_node(node1) sched.add_node(node2) - + collection = ["a.py::test_1", "a.py::test_2", "a.py::test_3", "a.py::test_4"] sched.add_node_collection(node1, collection) - + # Should use collection from node1 and distribute tests sched.schedule() - + # Check that tests were distributed across both nodes assert len(node1.sent) > 0 assert len(node2.sent) > 0 - + # Tests should be distributed completely all_tests = node1.sent + node2.sent assert sorted(all_tests) == list(range(len(collection))) - + # The pending list should be empty after distribution assert not sched.pending @@ -534,37 +534,37 @@ def test_handle_node_failure(self, pytester: pytest.Pytester) -> None: node1, node2 = MockNode(), MockNode() sched.add_node(node1) sched.add_node(node2) - + collection = ["a.py::test_1", "a.py::test_2", "a.py::test_3", "a.py::test_4"] sched.add_node_collection(node1, collection) sched.schedule() - + # Simulate node1 completing a test test_idx = node1.sent[0] sched.mark_test_complete(node1, test_idx) - + # Now remove node2 (simulating failure) sched.remove_node(node2) - + # Tests assigned to node2 should go back to pending assert len(sched.pending) > 0 - + # Add a new node node3 = MockNode() sched.add_node(node3) - + # Since collection is already completed, schedule should assign pending tests to node3 sched.schedule() assert len(node3.sent) > 0 - + # Complete all tests for idx in node1.sent: if idx != test_idx: # Skip the one we already completed sched.mark_test_complete(node1, idx) - + for idx in node3.sent: sched.mark_test_complete(node3, idx) - + # All tests should be completed assert sched.tests_finished @@ -573,26 +573,26 @@ def test_first_node_failure(self, pytester: pytest.Pytester) -> None: config = pytester.parseconfig("--tx=2*popen") sched = SingleCollectScheduling(config) node1, node2 = MockNode(), MockNode() - + # First node should be collector sched.add_node(node1) assert sched.first_node == node1 - + # Add second node sched.add_node(node2) - + # First node fails before collection sched.remove_node(node1) - + # Now second node should become the collector assert sched.first_node is None # first_node is reset after removal - + # Add a new node, it should become the collector node3 = MockNode() sched.add_node(node3) # Verify the new node became the collector assert sched.first_node == node3 - + # Complete collection with node3 collection = ["a.py::test_1", "a.py::test_2"] sched.add_node_collection(node3, collection) From 615cc9c4e09ff82bf8bae1f7634db31f6dd61fc7 Mon Sep 17 00:00:00 2001 From: zorhay Date: Tue, 4 Mar 2025 01:27:44 +0400 Subject: [PATCH 4/5] [FIX] Fix mypy errors. --- testing/test_dsession.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/testing/test_dsession.py b/testing/test_dsession.py index f5e1aabb..00da474b 100644 --- a/testing/test_dsession.py +++ b/testing/test_dsession.py @@ -583,10 +583,8 @@ def test_first_node_failure(self, pytester: pytest.Pytester) -> None: # First node fails before collection sched.remove_node(node1) - - # Now second node should become the collector - assert sched.first_node is None # first_node is reset after removal - + + # Now first_node should be reset after removal # Add a new node, it should become the collector node3 = MockNode() sched.add_node(node3) From 90e87f559edda46c2b487e10c2a07e272a7b19f4 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 3 Mar 2025 21:29:50 +0000 Subject: [PATCH 5/5] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- testing/test_dsession.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/testing/test_dsession.py b/testing/test_dsession.py index 00da474b..a1020d0b 100644 --- a/testing/test_dsession.py +++ b/testing/test_dsession.py @@ -583,7 +583,7 @@ def test_first_node_failure(self, pytester: pytest.Pytester) -> None: # First node fails before collection sched.remove_node(node1) - + # Now first_node should be reset after removal # Add a new node, it should become the collector node3 = MockNode()