You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I have several projects where the distribution of tests runtime is quite scattered, eg:
1000 tests of 10ms
100 tests of 1 minute
The current load scheduler comes short in this case, as it often ends up sending a batch of slow tests to the same worker.
As a workaround, I use a forked LoadScheduler that uses a fixed queue size (which I use with the minimum value of 2 -> each worker only has one test in its queue at any time):
class FixedLocalQueueLoadScheduling(LoadScheduling): # no cover
"""
A fork of pytest-xdist default load scheduler that uses a fixed size for workers local queue size.
"""
def __init__(self, config, log=None, queue_size=2):
super().__init__(config, log)
if queue_size < 2:
raise ValueError('Queue size must be at least 2')
self.queue_size = queue_size
def check_schedule(self, node, duration=0):
if node.shutting_down:
return
if self.pending:
node_pending = self.node2pending[node]
if len(node_pending) < self.queue_size:
num_send = self.queue_size - len(node_pending)
self._send_tests(node, num_send)
self.log("num items waiting for node:", len(self.pending))
def schedule(self):
assert self.collection_is_completed
# Initial distribution already happened, reschedule on all nodes
if self.collection is not None:
for node in self.nodes:
self.check_schedule(node)
return
# allow nodes to have different collections
if not self._check_nodes_have_same_collection():
self.log('**Different tests collected, aborting run**')
return
# Collections are identical, create the index of pending items.
self.collection = list(self.node2collection.values())[0]
self.pending[:] = range(len(self.collection))
if not self.collection:
return
# Send a batch of tests to run. If we don't have at least two
# tests per node, we have to send them all so that we can send
# shutdown signals and get all nodes working.
initial_batch = min(len(self.pending), self.queue_size * len(self.nodes))
# distribute tests round-robin up to the batch size
# (or until we run out)
nodes = cycle(self.nodes)
for i in range(initial_batch):
self._send_tests(next(nodes), 1)
if not self.pending:
# initial distribution sent all tests, start node shutdown
for node in self.nodes:
node.shutdown()
It would be nice to have at least one of these propositions implemented in xdist:
Integrate this scheduler (or an even simpler version where queue_size=2)
Make LoadScheduler configurable, so that users can provide initial_batch_size / items_per_node_min / items_per_node_max
When sending a batch of jobs to a node, shuffle like for the initial batch
Maybe improve/reduce a bit the defaults settings for initial_batch_size / items_per_node_min / items_per_node_max
The text was updated successfully, but these errors were encountered:
…nly spread load when a small proportion of the tests take much longer to run than the others. See pytest-dev#255. Add a new command line option to allow small batches to be used with --dist=loadsmall.
Maximum number of tests scheduled in one step.
Setting it to 1 will force pytest to send tests to workers one by one -
might be useful for a small number of slow tests.
Larger numbers will allow the scheduler to submit consecutive chunks of tests
to workers - allows reusing fixtures.
Unlimited if not set.
Fixes#855Fixes#255
I have several projects where the distribution of tests runtime is quite scattered, eg:
The current load scheduler comes short in this case, as it often ends up sending a batch of slow tests to the same worker.
As a workaround, I use a forked LoadScheduler that uses a fixed queue size (which I use with the minimum value of 2 -> each worker only has one test in its queue at any time):
It would be nice to have at least one of these propositions implemented in xdist:
The text was updated successfully, but these errors were encountered: