@@ -205,6 +205,76 @@ def test_live_execution_displays_subset_of_table(capsys, tmp_path, n_entries_in_
205
205
assert "│ ." in captured .out
206
206
207
207
208
+ @pytest .mark .unit
209
+ def test_live_execution_skips_do_not_crowd_out_displayed_tasks (capsys , tmp_path ):
210
+ path = tmp_path .joinpath ("task_module.py" )
211
+ task = PythonFunctionTask (
212
+ "task_example" , path .as_posix () + "::task_example" , path , lambda x : x
213
+ )
214
+ task .short_name = "task_module.py::task_example"
215
+
216
+ live_manager = LiveManager ()
217
+ live = LiveExecution (live_manager , 20 , 1 , "no_link" )
218
+
219
+ live_manager .start ()
220
+ live .update_running_tasks (task )
221
+ live_manager .stop ()
222
+
223
+ # Test table with running task.
224
+ captured = capsys .readouterr ()
225
+ assert "Task" in captured .out
226
+ assert "Outcome" in captured .out
227
+ assert "task_module.py::task_example" in captured .out
228
+ assert "running" in captured .out
229
+
230
+ # Add one displayed reports and many more not displayed reports to crowd out the
231
+ # valid one.
232
+ successful_task = PythonFunctionTask (
233
+ "task_success" , path .as_posix () + "::task_success" , path , lambda x : x
234
+ )
235
+ successful_task .short_name = "task_module.py::task_success"
236
+
237
+ tasks = []
238
+ for i in range (25 ):
239
+ skipped_task = PythonFunctionTask (
240
+ f"task_skip_{ i } " , path .as_posix () + f"::task_skip_{ i } " , path , lambda x : x
241
+ )
242
+ skipped_task .short_name = f"task_module.py::task_skip_{ i } "
243
+ tasks .append (skipped_task )
244
+
245
+ live_manager .start ()
246
+ live .update_running_tasks (successful_task )
247
+ for task in tasks :
248
+ live .update_running_tasks (task )
249
+ live_manager .stop ()
250
+
251
+ captured = capsys .readouterr ()
252
+ assert "running" in captured .out
253
+ assert "task_success" in captured .out
254
+ for i in range (25 ):
255
+ assert f"task_skip_{ i } " in captured .out
256
+
257
+ live_manager .resume ()
258
+ report = ExecutionReport (
259
+ task = successful_task , outcome = TaskOutcome .SUCCESS , exc_info = None
260
+ )
261
+ live .update_reports (report )
262
+ for task in tasks :
263
+ report = ExecutionReport (task = task , outcome = TaskOutcome .SKIP , exc_info = None )
264
+ live .update_reports (report )
265
+ live_manager .stop ()
266
+
267
+ # Test final table with reported outcome.
268
+ captured = capsys .readouterr ()
269
+ assert "Task" in captured .out
270
+ assert "Outcome" in captured .out
271
+ assert "task_module.py::task_example" in captured .out
272
+ assert "task_module.py::task_success" in captured .out
273
+ assert "running" in captured .out
274
+ assert TaskOutcome .SUCCESS .symbol in captured .out
275
+ assert "task_skip" not in captured .out
276
+
277
+
208
278
@pytest .mark .end_to_end
209
279
def test_full_execution_table_is_displayed_at_the_end_of_execution (tmp_path , runner ):
210
280
source = """
0 commit comments