-
Notifications
You must be signed in to change notification settings - Fork 3
Implement a first version of batching #756
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
""" WalkthroughA new batching mechanism was added to the Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant Executor
participant TaskScheduler
participant Future
User->>Executor: submit tasks (returns Futures)
User->>TaskScheduler: call batched([Futures], n)
loop For each batch
TaskScheduler->>TaskScheduler: call batched_futures with skip_lst
TaskScheduler->>TaskScheduler: submit batch task (batched)
TaskScheduler->>Future: return batch Future
TaskScheduler->>TaskScheduler: update skip_lst with new Future
end
User->>Executor: submit sum task for each batch
Executor->>Future: return sum Future
User->>Future: collect results
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~15 minutes Poem
✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #756 +/- ##
==========================================
+ Coverage 97.47% 97.52% +0.05%
==========================================
Files 32 33 +1
Lines 1423 1454 +31
==========================================
+ Hits 1387 1418 +31
Misses 36 36 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (3)
executorlib/task_scheduler/interactive/dependency.py (2)
156-172
: Fix unused loop variable and improve readability.The loop variable
j
is not used within the loop body. Additionally, the logic for calculating the number of iterations could be clearer.Apply this diff to address the static analysis warning and improve readability:
- for j in range(len(iterable) // n + (1 if len(iterable) % n > 0 else 0)): + num_batches = (len(iterable) + n - 1) // n # Ceiling division + for _ in range(num_batches):
161-162
: Improve error message consistency.The error message format doesn't match Python's standard error message format for missing positional arguments.
Apply this diff for better consistency:
- raise TypeError("batched() missing required argument 'n' (pos 2)") + raise TypeError("batched() missing 1 required positional argument: 'n'")tests/test_singlenodeexecutor_dependencies.py (1)
62-62
: Consider using a more descriptive function name.While
return_input_dict
works, the function name is misleading since it's being called with integers, not dictionaries. Consider using a simpler identity function or renaming.You could use a lambda or create a more appropriately named function:
- future_first_lst.append(exe.submit(return_input_dict, i)) + future_first_lst.append(exe.submit(lambda x: x, i))Or define a new helper function:
def identity(x): return x
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
executorlib/task_scheduler/interactive/dependency.py
(2 hunks)tests/test_singlenodeexecutor_dependencies.py
(1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
tests/test_singlenodeexecutor_dependencies.py (3)
executorlib/executor/single.py (1)
SingleNodeExecutor
(20-189)executorlib/task_scheduler/interactive/dependency.py (2)
submit
(112-154)batched
(156-172)tests/test_singlenodeexecutor_plot_dependency.py (1)
return_input_dict
(42-43)
🪛 Ruff (0.12.2)
executorlib/task_scheduler/interactive/dependency.py
165-165: Loop control variable j
not used within loop body
Rename unused j
to _j
(B007)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (15)
- GitHub Check: benchmark (ubuntu-latest, 3.13, .ci_support/environment-openmpi.yml)
- GitHub Check: benchmark (ubuntu-latest, 3.13, .ci_support/environment-mpich.yml)
- GitHub Check: unittest_mpich (macos-latest, 3.13)
- GitHub Check: unittest_mpich (ubuntu-latest, 3.12)
- GitHub Check: unittest_mpich (ubuntu-latest, 3.13)
- GitHub Check: unittest_mpich (ubuntu-latest, 3.11)
- GitHub Check: notebooks_integration
- GitHub Check: unittest_openmpi (macos-latest, 3.13)
- GitHub Check: unittest_flux_openmpi
- GitHub Check: unittest_openmpi (ubuntu-latest, 3.13)
- GitHub Check: unittest_openmpi (ubuntu-latest, 3.12)
- GitHub Check: unittest_openmpi (ubuntu-latest, 3.11)
- GitHub Check: unittest_flux_mpich
- GitHub Check: unittest_win
- GitHub Check: unittest_slurm_mpich
🔇 Additional comments (1)
tests/test_singlenodeexecutor_dependencies.py (1)
58-71
: Test logic is correct and well-structured.The test effectively validates the batching functionality:
- Creates 10 futures with values 0-9 (sum = 45)
- Batches into groups of 3, expecting 4 batches: [0,1,2], [3,4,5], [6,7,8], [9]
- Sums each batch and verifies total sum equals 45
- Correctly expects 4 batch results
The test provides good coverage for the new batching feature.
def _batched(lst, n=None, skip_lst=None): | ||
if skip_lst is None: | ||
tmp_lst = [] | ||
else: | ||
tmp_lst = [] | ||
for el in skip_lst: | ||
if isinstance(el, Future): | ||
result = el.result() | ||
if isinstance(result, list): | ||
tmp_lst += el | ||
else: | ||
tmp_lst += [el] | ||
else: | ||
tmp_lst += el | ||
|
||
done_lst = [] | ||
while len(done_lst) < n and len(done_lst) < len(lst) - len(tmp_lst): | ||
for v in lst: | ||
if v not in tmp_lst and v not in done_lst: | ||
done_lst.append(v) | ||
if len(done_lst) == n: | ||
break | ||
|
||
return done_lst |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Critical bugs in the _batched helper function.
The function has several serious issues that will cause runtime errors and incorrect behavior:
- Line 319:
tmp_lst += el
should betmp_lst += result
- this will cause a TypeError sinceel
is a Future object, not iterable - Line 321:
tmp_lst += [el]
should betmp_lst.append(result)
- same issue with Future object - Line 323:
tmp_lst += el
should handle the case whereel
might not be a list
Apply this diff to fix the critical bugs:
def _batched(lst, n=None, skip_lst=None):
if skip_lst is None:
tmp_lst = []
else:
tmp_lst = []
for el in skip_lst:
if isinstance(el, Future):
result = el.result()
if isinstance(result, list):
- tmp_lst += el
+ tmp_lst += result
else:
- tmp_lst += [el]
+ tmp_lst.append(result)
else:
- tmp_lst += el
+ if isinstance(el, list):
+ tmp_lst += el
+ else:
+ tmp_lst.append(el)
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
def _batched(lst, n=None, skip_lst=None): | |
if skip_lst is None: | |
tmp_lst = [] | |
else: | |
tmp_lst = [] | |
for el in skip_lst: | |
if isinstance(el, Future): | |
result = el.result() | |
if isinstance(result, list): | |
tmp_lst += el | |
else: | |
tmp_lst += [el] | |
else: | |
tmp_lst += el | |
done_lst = [] | |
while len(done_lst) < n and len(done_lst) < len(lst) - len(tmp_lst): | |
for v in lst: | |
if v not in tmp_lst and v not in done_lst: | |
done_lst.append(v) | |
if len(done_lst) == n: | |
break | |
return done_lst | |
def _batched(lst, n=None, skip_lst=None): | |
if skip_lst is None: | |
tmp_lst = [] | |
else: | |
tmp_lst = [] | |
for el in skip_lst: | |
if isinstance(el, Future): | |
result = el.result() | |
if isinstance(result, list): | |
tmp_lst += result | |
else: | |
tmp_lst.append(result) | |
else: | |
if isinstance(el, list): | |
tmp_lst += el | |
else: | |
tmp_lst.append(el) | |
done_lst = [] | |
while len(done_lst) < n and len(done_lst) < len(lst) - len(tmp_lst): | |
for v in lst: | |
if v not in tmp_lst and v not in done_lst: | |
done_lst.append(v) | |
if len(done_lst) == n: | |
break | |
return done_lst |
🤖 Prompt for AI Agents
In executorlib/task_scheduler/interactive/dependency.py around lines 310 to 333,
the _batched function incorrectly handles Future objects and list concatenations
causing runtime errors. Replace 'tmp_lst += el' on line 319 with 'tmp_lst +=
result' to add the resolved result list, change 'tmp_lst += [el]' on line 321 to
'tmp_lst.append(result)' to append the single result, and modify 'tmp_lst += el'
on line 323 to properly check if el is iterable before extending tmp_lst or else
append el to avoid type errors.
🛠️ Refactor suggestion
Optimize the batching algorithm for better performance.
The current nested loop approach has O(n²) time complexity in the worst case. The algorithm repeatedly searches through the entire list for each item to add to the batch.
Here's a more efficient O(n) approach:
def _batched(lst, n=None, skip_lst=None):
if skip_lst is None:
tmp_lst = []
else:
tmp_lst = []
for el in skip_lst:
if isinstance(el, Future):
result = el.result()
if isinstance(result, list):
tmp_lst += result
else:
tmp_lst.append(result)
else:
if isinstance(el, list):
tmp_lst += el
else:
tmp_lst.append(el)
- done_lst = []
- while len(done_lst) < n and len(done_lst) < len(lst) - len(tmp_lst):
- for v in lst:
- if v not in tmp_lst and v not in done_lst:
- done_lst.append(v)
- if len(done_lst) == n:
- break
-
- return done_lst
+ # Create a set for O(1) lookup performance
+ tmp_set = set(tmp_lst)
+ done_lst = []
+
+ for v in lst:
+ if v not in tmp_set and len(done_lst) < n:
+ done_lst.append(v)
+ tmp_set.add(v) # Prevent duplicates in current batch
+
+ return done_lst
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In executorlib/task_scheduler/interactive/dependency.py around lines 310 to 333,
the current _batched function uses nested loops causing O(n²) complexity. To fix
this, refactor the function to use a single pass through the list, maintaining a
set of skip elements for O(1) membership checks, and append elements to the
result list until the batch size n is reached. This eliminates repeated searches
and improves performance to O(n).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (2)
executorlib/standalone/batched.py (2)
4-18
: Critical bugs in skip_lst processing that will cause runtime errors.The function has several serious issues identical to those flagged in past reviews:
- Line 13:
tmp_lst += el
should betmp_lst += result
- this will cause a TypeError sinceel
is a Future object, not iterable- Line 15:
tmp_lst += [el]
should appendresult
, not the Future objectel
- Line 17:
tmp_lst += el
assumesel
is iterable but doesn't handle non-list cases properlyApply this diff to fix the critical bugs:
- if isinstance(el, Future): - result = el.result() - if isinstance(result, list): - tmp_lst += el - else: - tmp_lst += [el] - else: - tmp_lst += el + if isinstance(el, Future): + result = el.result() + if isinstance(result, list): + tmp_lst += result + else: + tmp_lst.append(result) + else: + if isinstance(el, list): + tmp_lst += el + else: + tmp_lst.append(el)
19-27
: Optimize the batching algorithm for better performance.The current nested loop approach has O(n²) time complexity. The algorithm repeatedly searches through the entire list for each item to add to the batch.
Here's a more efficient O(n) approach:
- done_lst = [] - while len(done_lst) < n and len(done_lst) < len(lst) - len(tmp_lst): - for v in lst: - if v not in tmp_lst and v not in done_lst: - done_lst.append(v) - if len(done_lst) == n: - break - - return done_lst + # Create a set for O(1) lookup performance + tmp_set = set(tmp_lst) + done_lst = [] + + for v in lst: + if v not in tmp_set and len(done_lst) < n: + done_lst.append(v) + tmp_set.add(v) # Prevent duplicates in current batch + + return done_lst
🧹 Nitpick comments (2)
executorlib/task_scheduler/interactive/dependency.py (1)
166-166
: Rename unused loop variable for clarity.The loop control variable
j
is not used within the loop body.Apply this diff:
- for j in range(len(iterable) // n + (1 if len(iterable) % n > 0 else 0)): + for _ in range(len(iterable) // n + (1 if len(iterable) % n > 0 else 0)):tests/test_standalone_batched.py (1)
6-11
: Good basic test coverage, but consider additional test cases.The test covers the fundamental batching logic well, but could benefit from additional edge cases and Future object testing.
Consider adding these test cases to improve coverage:
def test_batched_edge_cases(self): lst = list(range(10)) # Test with n=0 self.assertEqual(batched(lst=lst, n=0), []) # Test with n > len(lst) self.assertEqual(batched(lst=lst, n=15), lst) # Test with empty list self.assertEqual(batched(lst=[], n=3), []) # Test with Future objects in skip_lst (once bugs are fixed) from concurrent.futures import Future future = Future() future.set_result([0, 1, 2]) self.assertEqual(batched(lst=lst, skip_lst=[future], n=3), [3, 4, 5])
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
executorlib/standalone/batched.py
(1 hunks)executorlib/task_scheduler/interactive/dependency.py
(2 hunks)tests/test_standalone_batched.py
(1 hunks)
🧰 Additional context used
🪛 Ruff (0.12.2)
executorlib/task_scheduler/interactive/dependency.py
166-166: Loop control variable j
not used within loop body
Rename unused j
to _j
(B007)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (18)
- GitHub Check: unittest_old
- GitHub Check: pip_check
- GitHub Check: unittest_openmpi (ubuntu-latest, 3.13)
- GitHub Check: notebooks_integration
- GitHub Check: unittest_mpich (ubuntu-latest, 3.11)
- GitHub Check: unittest_openmpi (ubuntu-latest, 3.11)
- GitHub Check: unittest_win
- GitHub Check: unittest_openmpi (ubuntu-latest, 3.12)
- GitHub Check: unittest_openmpi (macos-latest, 3.13)
- GitHub Check: unittest_mpich (ubuntu-latest, 3.12)
- GitHub Check: unittest_slurm_mpich
- GitHub Check: benchmark (ubuntu-latest, 3.13, .ci_support/environment-mpich.yml)
- GitHub Check: notebooks
- GitHub Check: benchmark (ubuntu-latest, 3.13, .ci_support/environment-openmpi.yml)
- GitHub Check: unittest_mpich (macos-latest, 3.13)
- GitHub Check: unittest_flux_openmpi
- GitHub Check: unittest_mpich (ubuntu-latest, 3.13)
- GitHub Check: unittest_flux_mpich
🔇 Additional comments (3)
executorlib/task_scheduler/interactive/dependency.py (3)
7-7
: LGTM: Proper import of the standalone batched function.The import correctly brings in the standalone batched function for use in the scheduler.
157-165
: LGTM: Good error handling and initialization.The method properly validates the required
n
parameter and initializes the necessary data structures.
167-172
: LGTM: Correct batching implementation with proper skip_lst management.The method correctly submits batched tasks and manages the skip_lst by copying and appending new futures. The comment "be careful" appropriately highlights the importance of the copy operation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (1)
tests/test_singlenodeexecutor_dependencies.py (1)
77-80
: Good error handling test, but simplify the nested with statements.The test correctly validates that calling
batched()
with an empty list raises aTypeError
. However, the nestedwith
statements can be combined for better readability.Apply this diff to combine the with statements:
def test_batched_error(self): - with self.assertRaises(TypeError): - with SingleNodeExecutor() as exe: - exe._task_scheduler.batched([]) + with self.assertRaises(TypeError), SingleNodeExecutor() as exe: + exe._task_scheduler.batched([])
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
executorlib/standalone/batched.py
(1 hunks)executorlib/task_scheduler/interactive/dependency.py
(3 hunks)tests/test_singlenodeexecutor_dependencies.py
(2 hunks)tests/test_standalone_batched.py
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- executorlib/task_scheduler/interactive/dependency.py
🧰 Additional context used
🧬 Code Graph Analysis (2)
executorlib/standalone/batched.py (1)
executorlib/task_scheduler/file/shared.py (2)
result
(25-39)done
(41-49)
tests/test_singlenodeexecutor_dependencies.py (3)
executorlib/executor/single.py (1)
SingleNodeExecutor
(20-189)executorlib/task_scheduler/interactive/dependency.py (2)
submit
(113-155)batched
(157-182)tests/test_singlenodeexecutor_plot_dependency.py (1)
return_input_dict
(42-43)
🪛 Ruff (0.12.2)
tests/test_singlenodeexecutor_dependencies.py
78-79: Use a single with
statement with multiple contexts instead of nested with
statements
Combine with
statements
(SIM117)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (16)
- GitHub Check: unittest_mpich (ubuntu-latest, 3.12)
- GitHub Check: unittest_mpich (ubuntu-latest, 3.11)
- GitHub Check: unittest_mpich (macos-latest, 3.13)
- GitHub Check: unittest_mpich (ubuntu-latest, 3.13)
- GitHub Check: unittest_slurm_mpich
- GitHub Check: notebooks_integration
- GitHub Check: unittest_flux_mpich
- GitHub Check: unittest_flux_openmpi
- GitHub Check: notebooks
- GitHub Check: benchmark (ubuntu-latest, 3.13, .ci_support/environment-mpich.yml)
- GitHub Check: benchmark (ubuntu-latest, 3.13, .ci_support/environment-openmpi.yml)
- GitHub Check: unittest_openmpi (ubuntu-latest, 3.13)
- GitHub Check: unittest_openmpi (ubuntu-latest, 3.11)
- GitHub Check: unittest_win
- GitHub Check: unittest_openmpi (ubuntu-latest, 3.12)
- GitHub Check: unittest_openmpi (macos-latest, 3.13)
🔇 Additional comments (3)
executorlib/standalone/batched.py (1)
19-23
: Function implementation is correct.The
_merge_lists
helper function correctly flattens a list of lists. However, ensure the calling code passes lists of lists as expected.tests/test_singlenodeexecutor_dependencies.py (2)
3-3
: Import additions are appropriate.The added
sleep, time
imports are correctly used in the new test methods for timing-related assertions.
58-75
: Comprehensive integration test for batching functionality.The test effectively validates the end-to-end batching workflow with correct mathematical assertions. The timing comparison (
t3-t2 > t2-t1
) appropriately verifies that result retrieval takes longer than task submission.Note: The test uses private API
_task_scheduler.batched()
which may be intentional for internal testing but consider exposing this functionality publicly if it's meant for user consumption.
tests/test_standalone_batched.py
Outdated
def test_check_for_available_batch(self): | ||
lst = [] | ||
for i in list(range(10)): | ||
f = Future() | ||
f.set_result(i) | ||
lst.append(f) | ||
f1 = Future() | ||
f1.set_result([0, 1, 2]) | ||
f2 = Future() | ||
f2.set_result([3, 4, 5]) | ||
f3 = Future() | ||
f3.set_result([6, 7, 8]) | ||
self.assertEqual(check_for_available_batch(lst=lst, n=3, skip_lst=[]), [0, 1, 2]) | ||
self.assertEqual(check_for_available_batch(lst=lst, skip_lst=[f1], n=3), [3, 4, 5]) | ||
self.assertEqual(check_for_available_batch(lst=lst, skip_lst=[f1, f2], n=3), [6, 7, 8]) | ||
self.assertEqual(check_for_available_batch(lst=lst, skip_lst=[f1, f2, f3], n=3), [9]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Test may validate incorrect behavior due to implementation issues.
The test structure is well-designed and covers the main scenarios. However, given the logical issues identified in check_for_available_batch
, this test might be validating incorrect behavior rather than the intended functionality.
After fixing the implementation issues in check_for_available_batch
, verify that this test still passes or update it accordingly:
🏁 Script executed:
#!/bin/bash
# Description: Run the batched tests to verify behavior after implementation fixes
# Expected: Tests should pass with corrected implementation
cd tests
python -m pytest test_standalone_batched.py::TestBatched::test_check_for_available_batch -v
Length of output: 1822
Missing dependency: cloudpickle
causing test failures
The test_standalone_batched.py
import of executorlib.standalone.serialize
fails because cloudpickle
isn’t installed in the test environment. Until this is resolved you won’t be able to verify the behavior of your fixed implementation.
Please take the following actions:
- Add
cloudpickle
to your development/test dependencies (e.g.,pyproject.toml
,requirements-dev.txt
). - Re-run the specific test once dependencies are installed:
pip install cloudpickle pytest tests/test_standalone_batched.py::TestBatched::test_check_for_available_batch -v
- Confirm that your corrected
check_for_available_batch
implementation passes these assertions—or update the test expectations if the intended behavior has changed.
🤖 Prompt for AI Agents
In tests/test_standalone_batched.py around lines 7 to 22, the test fails due to
a missing cloudpickle dependency required by executorlib.standalone.serialize.
Fix this by adding cloudpickle to the development or test dependencies file such
as pyproject.toml or requirements-dev.txt. After installing cloudpickle, rerun
the test with pytest to verify the check_for_available_batch function behaves as
expected and update the test assertions if the function's behavior has changed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (4)
tests/test_cache_fileexecutor_mpi.py (1)
40-43
: LGTM! Consider combining the nestedwith
statements for better readability.The test correctly verifies that
FileTaskScheduler
raisesNotImplementedError
whenbatched
is called, which aligns with the base class behavior.Apply this diff to combine the nested
with
statements:- def test_batched_error(self): - with self.assertRaises(NotImplementedError): - with FileTaskScheduler() as exe: - exe.batched(iterable=[]) + def test_batched_error(self): + with self.assertRaises(NotImplementedError), \ + FileTaskScheduler() as exe: + exe.batched(iterable=[])executorlib/executor/base.py (1)
53-68
: LGTM! Fix typo in the docstring.The method correctly delegates batching functionality to the underlying task scheduler. The implementation follows the established pattern of delegating to
_task_scheduler
.Apply this diff to fix the typo in the docstring:
Args: iterable (list): list of future objects to batch based on which future objects finish first - n (int): badge size + n (int): batch size Returns: list[Future]: list of future objects one for each batchexecutorlib/task_scheduler/base.py (1)
72-87
: LGTM! Fix typo in the docstring.The method correctly declares the batching interface for the base class, raising
NotImplementedError
as expected. The signature is consistent with the executor-level method.Apply this diff to fix the typo in the docstring:
Args: iterable (list): list of future objects to batch based on which future objects finish first - n (int): badge size + n (int): batch size Returns: list[Future]: list of future objects one for each batchtests/test_singlenodeexecutor_dependencies.py (1)
77-80
: LGTM! Consider combining the nestedwith
statements for consistency.The test correctly verifies that calling
batched
without the requiredn
parameter raises aTypeError
.Apply this diff to combine the nested
with
statements:- def test_batched_error(self): - with self.assertRaises(TypeError): - with SingleNodeExecutor() as exe: - exe.batched([]) + def test_batched_error(self): + with self.assertRaises(TypeError), \ + SingleNodeExecutor() as exe: + exe.batched([])
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
executorlib/executor/base.py
(1 hunks)executorlib/task_scheduler/base.py
(1 hunks)executorlib/task_scheduler/interactive/dependency.py
(3 hunks)tests/test_cache_fileexecutor_mpi.py
(1 hunks)tests/test_singlenodeexecutor_dependencies.py
(2 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (3)
tests/test_cache_fileexecutor_mpi.py (4)
tests/test_singlenodeexecutor_dependencies.py (1)
test_batched_error
(77-80)executorlib/task_scheduler/file/task_scheduler.py (1)
FileTaskScheduler
(30-79)executorlib/executor/base.py (1)
batched
(53-68)executorlib/task_scheduler/base.py (1)
batched
(72-87)
executorlib/task_scheduler/base.py (2)
executorlib/executor/base.py (1)
batched
(53-68)executorlib/task_scheduler/interactive/dependency.py (1)
batched
(157-192)
executorlib/task_scheduler/interactive/dependency.py (4)
executorlib/executor/base.py (1)
batched
(53-68)executorlib/task_scheduler/base.py (1)
batched
(72-87)executorlib/standalone/batched.py (1)
check_for_available_batch
(4-16)executorlib/standalone/interactive/arguments.py (1)
update_futures_in_input
(69-93)
🪛 Ruff (0.12.2)
tests/test_cache_fileexecutor_mpi.py
41-42: Use a single with
statement with multiple contexts instead of nested with
statements
Combine with
statements
(SIM117)
tests/test_singlenodeexecutor_dependencies.py
78-79: Use a single with
statement with multiple contexts instead of nested with
statements
Combine with
statements
(SIM117)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (16)
- GitHub Check: unittest_mpich (macos-latest, 3.13)
- GitHub Check: unittest_openmpi (ubuntu-latest, 3.11)
- GitHub Check: unittest_mpich (ubuntu-latest, 3.11)
- GitHub Check: unittest_openmpi (ubuntu-latest, 3.13)
- GitHub Check: notebooks
- GitHub Check: unittest_win
- GitHub Check: unittest_mpich (ubuntu-latest, 3.12)
- GitHub Check: unittest_mpich (ubuntu-latest, 3.13)
- GitHub Check: unittest_openmpi (macos-latest, 3.13)
- GitHub Check: unittest_flux_mpich
- GitHub Check: unittest_flux_openmpi
- GitHub Check: unittest_openmpi (ubuntu-latest, 3.12)
- GitHub Check: benchmark (ubuntu-latest, 3.13, .ci_support/environment-openmpi.yml)
- GitHub Check: benchmark (ubuntu-latest, 3.13, .ci_support/environment-mpich.yml)
- GitHub Check: unittest_slurm_mpich
- GitHub Check: notebooks_integration
🔇 Additional comments (3)
tests/test_singlenodeexecutor_dependencies.py (1)
58-75
: Excellent comprehensive test for the batching functionality.The test thoroughly validates the batching behavior including:
- Correct batching of futures into groups of size 3
- Proper chaining of batch results through sum operations
- Expected total sum calculation (45 for range(10))
- Correct number of batches (4 for 10 items with batch size 3)
- Timing constraints to verify asynchronous execution
executorlib/task_scheduler/interactive/dependency.py (2)
157-192
: Solid implementation of the batching functionality.The method correctly:
- Validates that
n
parameter is provided (raisingTypeError
if missing)- Calculates the expected number of batches using ceiling division
- Creates futures for each batch and queues them with appropriate metadata
- Maintains
skip_lst
to track previously processed batches- Returns the list of batch futures
The logic for incrementally building
skip_lst
by copying and appending each new future is correct and prevents interference between batches.
319-338
: Well-implemented batching logic integration.The changes correctly differentiate between regular tasks and batched tasks:
- Regular tasks wait for all futures in
future_lst
to be done- Batched tasks wait for all futures in
skip_lst
to be done, then callcheck_for_available_batch
- If no batch is ready (
done_lst
is empty), the task remains in the waiting list- If a batch is ready, the future is set with the batch results
This integrates cleanly with the existing task processing logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
executorlib/standalone/batched.py (1)
4-29
: Fix critical logical issues in the batching function.The function still has the same critical issues identified in previous reviews:
Incorrect parameter handling: The
skip_lst
parameter is typed aslist[list]
but the function expects it to contain Future results, not lists.Flawed completion logic: Line 26 compares
len(lst) - len(skipped_elements_lst)
which compares the count of total futures with the count of merged elements, not the count of skipped futures.Missing validation: The function doesn't validate the
n
parameter for None or invalid values.The batching logic needs to be corrected to properly handle the skip list and completion conditions.
🧹 Nitpick comments (2)
executorlib/standalone/batched.py (1)
32-45
: Improve docstring accuracy and consider more efficient implementation.The function has a minor documentation issue and could be optimized:
- Inaccurate docstring: Line 34 says "Merge two lists" but the function accepts a list of any length.
- Potential for optimization: The current implementation could be more efficient.
Apply this diff to fix the docstring:
def _merge_lists(lst: list[list]) -> list: """ - Merge two lists into a new list. + Merge multiple lists into a single flat list. Args: lst (list): list of lists Returns: list: merged list of lists """Consider using a more efficient implementation:
def _merge_lists(lst: list[list]) -> list: """ Merge multiple lists into a single flat list. Args: lst (list): list of lists Returns: list: merged list of lists """ - results_lst = [] - for el in lst: - results_lst += el - return results_lst + return [item for sublist in lst for item in sublist]executorlib/task_scheduler/interactive/dependency.py (1)
157-192
: Add input validation for robustness.The batching logic is well-implemented, but could benefit from additional input validation:
Consider adding validation for the input parameters:
def batched( self, iterable: list[Future], n: Optional[int] = None, ) -> list[Future]: """ Batch futures from the iterable into tuples of length n. The last batch may be shorter than n. Args: iterable (list): list of future objects to batch based on which future objects finish first n (int): badge size Returns: list[Future]: list of future objects one for each batch """ if n is None: raise TypeError("batched() missing required argument 'n' (pos 2)") + if n <= 0: + raise ValueError("batch size 'n' must be positive") + if not iterable: + return [] + if not all(isinstance(item, Future) for item in iterable): + raise TypeError("iterable must contain only Future objects")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
executorlib/standalone/batched.py
(1 hunks)executorlib/task_scheduler/interactive/dependency.py
(3 hunks)tests/test_standalone_batched.py
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- tests/test_standalone_batched.py
🧰 Additional context used
🧬 Code Graph Analysis (2)
executorlib/standalone/batched.py (1)
executorlib/task_scheduler/file/shared.py (2)
done
(41-49)result
(25-39)
executorlib/task_scheduler/interactive/dependency.py (4)
executorlib/executor/base.py (1)
batched
(53-68)executorlib/task_scheduler/base.py (1)
batched
(72-87)executorlib/standalone/batched.py (1)
batched_futures
(4-29)executorlib/standalone/interactive/arguments.py (1)
update_futures_in_input
(69-93)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (16)
- GitHub Check: benchmark (ubuntu-latest, 3.13, .ci_support/environment-openmpi.yml)
- GitHub Check: notebooks_integration
- GitHub Check: unittest_flux_mpich
- GitHub Check: unittest_mpich (ubuntu-latest, 3.11)
- GitHub Check: unittest_mpich (ubuntu-latest, 3.13)
- GitHub Check: benchmark (ubuntu-latest, 3.13, .ci_support/environment-mpich.yml)
- GitHub Check: unittest_openmpi (ubuntu-latest, 3.11)
- GitHub Check: unittest_mpich (ubuntu-latest, 3.12)
- GitHub Check: unittest_win
- GitHub Check: notebooks
- GitHub Check: unittest_mpich (macos-latest, 3.13)
- GitHub Check: unittest_flux_openmpi
- GitHub Check: unittest_openmpi (ubuntu-latest, 3.13)
- GitHub Check: unittest_openmpi (ubuntu-latest, 3.12)
- GitHub Check: unittest_openmpi (macos-latest, 3.13)
- GitHub Check: unittest_slurm_mpich
🔇 Additional comments (2)
executorlib/task_scheduler/interactive/dependency.py (2)
7-7
: LGTM!The import is correctly added to support the batching functionality.
319-338
: Integration logic is correct but depends on fixing batched_futures.The integration of batching logic into the task waiting mechanism is well-implemented:
- Proper separation: Correctly separates "batched" tasks from regular task processing
- Correct parameter passing: Passes the right parameters to
batched_futures
- Appropriate waiting logic: Waits for skip_lst futures to complete before processing
However, this implementation depends on the
batched_futures
function which has critical logical issues that need to be addressed first.The batching integration will work correctly once the underlying
batched_futures
function is fixed per the issues identified inexecutorlib/standalone/batched.py
.
a5d1f97
to
39412a0
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
tests/test_cache_fileexecutor_mpi.py (1)
40-43
: Simplify nestedwith
statements and approve test logic.The test correctly verifies that
FileTaskScheduler
raisesNotImplementedError
whenbatched()
is called. However, the nestedwith
statements can be simplified.Apply this diff to combine the
with
statements:- def test_batched_error(self): - with self.assertRaises(NotImplementedError): - with FileTaskScheduler() as exe: - exe.batched(iterable=[]) + def test_batched_error(self): + with self.assertRaises(NotImplementedError), FileTaskScheduler() as exe: + exe.batched(iterable=[])tests/test_singlenodeexecutor_dependencies.py (1)
77-80
: Simplify nestedwith
statements and approve error handling test.The test correctly verifies that calling
batched()
with an empty list raises aTypeError
. However, the nestedwith
statements can be simplified.Apply this diff to combine the
with
statements:- def test_batched_error(self): - with self.assertRaises(TypeError): - with SingleNodeExecutor() as exe: - exe.batched([]) + def test_batched_error(self): + with self.assertRaises(TypeError), SingleNodeExecutor() as exe: + exe.batched([])
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
executorlib/executor/base.py
(1 hunks)executorlib/standalone/batched.py
(1 hunks)executorlib/task_scheduler/base.py
(1 hunks)executorlib/task_scheduler/interactive/dependency.py
(3 hunks)tests/test_cache_fileexecutor_mpi.py
(1 hunks)tests/test_singlenodeexecutor_dependencies.py
(2 hunks)tests/test_standalone_batched.py
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (5)
- tests/test_standalone_batched.py
- executorlib/task_scheduler/base.py
- executorlib/executor/base.py
- executorlib/task_scheduler/interactive/dependency.py
- executorlib/standalone/batched.py
🧰 Additional context used
🪛 Ruff (0.12.2)
tests/test_cache_fileexecutor_mpi.py
41-42: Use a single with
statement with multiple contexts instead of nested with
statements
Combine with
statements
(SIM117)
tests/test_singlenodeexecutor_dependencies.py
78-79: Use a single with
statement with multiple contexts instead of nested with
statements
Combine with
statements
(SIM117)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (16)
- GitHub Check: unittest_openmpi (ubuntu-latest, 3.13)
- GitHub Check: unittest_openmpi (ubuntu-latest, 3.11)
- GitHub Check: unittest_openmpi (ubuntu-latest, 3.12)
- GitHub Check: unittest_openmpi (macos-latest, 3.13)
- GitHub Check: unittest_flux_openmpi
- GitHub Check: unittest_mpich (ubuntu-latest, 3.12)
- GitHub Check: unittest_win
- GitHub Check: unittest_mpich (ubuntu-latest, 3.11)
- GitHub Check: unittest_slurm_mpich
- GitHub Check: unittest_mpich (macos-latest, 3.13)
- GitHub Check: unittest_mpich (ubuntu-latest, 3.13)
- GitHub Check: notebooks_integration
- GitHub Check: unittest_flux_mpich
- GitHub Check: benchmark (ubuntu-latest, 3.13, .ci_support/environment-openmpi.yml)
- GitHub Check: benchmark (ubuntu-latest, 3.13, .ci_support/environment-mpich.yml)
- GitHub Check: notebooks
🔇 Additional comments (2)
tests/test_singlenodeexecutor_dependencies.py (2)
3-3
: LGTM!The
time
import is correctly added to support timing assertions in the new batching test.
58-75
: Excellent comprehensive test for batching functionality.The test thoroughly validates the batching behavior by:
- Creating 10 tasks and batching them into groups of 3
- Verifying the correct number of batches (4) and total sum (45)
- Including timing assertions to ensure batching execution takes longer than submission
The logic correctly expects 4 batches from 10 items with batch size 3: [0,1,2], [3,4,5], [6,7,8], [9].
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was just reading it out of curiosity, but since I noticed some stuff I left comments. I wouldn’t be mad if you ignore them.
Co-authored-by: Sam Dareska <[email protected]>
Co-authored-by: Sam Dareska <[email protected]>
Thanks a lot @samwaseda , I am always happy to get feedback. I included your suggestions in the pull request. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
tests/test_cache_fileexecutor_mpi.py (1)
40-43
: LGTM! Test correctly verifies the expected error behavior.The test properly validates that
FileTaskScheduler.batched()
raisesNotImplementedError
, which aligns with the base class implementation inTaskSchedulerBase
.Consider combining the nested
with
statements for better readability:- def test_batched_error(self): - with self.assertRaises(NotImplementedError): - with FileTaskScheduler() as exe: - exe.batched(iterable=[], n=2) + def test_batched_error(self): + with self.assertRaises(NotImplementedError), FileTaskScheduler() as exe: + exe.batched(iterable=[], n=2)executorlib/standalone/batched.py (1)
4-27
: Core batching logic is correct, but consider optimizing the skipped elements lookup.The function correctly implements the batching logic by flattening the skip list, calculating the expected batch size, and collecting results from completed futures.
For better performance with large skip lists, consider using a set for faster lookups:
def batched_futures(lst: list[Future], skip_lst: list[list], n: int) -> list[list]: """ Batch n completed future objects. If the number of completed futures is smaller than n and the end of the batch is not reached yet, then an empty list is returned. If n future objects are done, which are not included in the skip_lst then they are returned as batch. Args: lst (list): list of all future objects skip_lst (list): list of previous batches of future objects n (int): batch size Returns: list: results of the batched futures """ - skipped_elements_lst = [item for items in skip_lst for item in items] + skipped_elements_set = {item for items in skip_lst for item in items} done_lst = [] n_expected = min(n, len(lst) - len(skipped_elements_lst)) for v in lst: - if v.done() and v.result() not in skipped_elements_lst: + if v.done() and v.result() not in skipped_elements_set: done_lst.append(v.result()) if len(done_lst) == n_expected: return done_lst return []Note: You'll also need to update the
n_expected
calculation to uselen(skipped_elements_set)
instead oflen(skipped_elements_lst)
.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
executorlib/executor/base.py
(1 hunks)executorlib/standalone/batched.py
(1 hunks)executorlib/task_scheduler/base.py
(1 hunks)executorlib/task_scheduler/interactive/dependency.py
(3 hunks)tests/test_cache_fileexecutor_mpi.py
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
- executorlib/task_scheduler/base.py
- executorlib/executor/base.py
- executorlib/task_scheduler/interactive/dependency.py
🧰 Additional context used
🧬 Code Graph Analysis (2)
executorlib/standalone/batched.py (1)
executorlib/task_scheduler/file/shared.py (2)
done
(41-49)result
(25-39)
tests/test_cache_fileexecutor_mpi.py (5)
tests/test_singlenodeexecutor_dependencies.py (1)
test_batched_error
(77-80)executorlib/task_scheduler/file/task_scheduler.py (1)
FileTaskScheduler
(28-80)executorlib/task_scheduler/base.py (1)
batched
(72-87)executorlib/executor/base.py (1)
batched
(53-68)executorlib/task_scheduler/interactive/dependency.py (1)
batched
(157-189)
🪛 Ruff (0.12.2)
tests/test_cache_fileexecutor_mpi.py
41-42: Use a single with
statement with multiple contexts instead of nested with
statements
Combine with
statements
(SIM117)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: unittest_win
- GitHub Check: unittest_old
- GitHub Check: notebooks
- GitHub Check: unittest_slurm_mpich
Cumulative batching: from executorlib import SingleNodeExecutor
def reply(i):
return I
def special_sum(lst):
return sum([i for sub in lst for i in sub])
with SingleNodeExecutor() as exe:
print("step: 0")
future_first_lst = []
for i in range(10):
future_first_lst.append(exe.submit(reply, i))
print("step: 1")
n = 3
future_second_lst = exe._task_scheduler.batched(future_first_lst, n=n)
print("step: 2")
future_third_lst = []
prev_lst = []
for f in future_second_lst:
prev_lst.append(f)
future_third_lst.append(exe.submit(special_sum, prev_lst.copy()))
print("step: 3")
lst = [f.result() for f in future_third_lst]
print(lst, sum(lst)) |
The L = []
def demo(a):
a.append(1)
demo(L)
print(L) Here the function modifies the list outside, while in the code snippet for executorlib the loop modifies the list which is an input argument while the function is still waiting to be executed. |
Example:
Summary by CodeRabbit