Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor and Enhance Dispatcher: Worker Management, pg_notify Locking, Error Handling, and More #1

Merged

Conversation

art-tapin
Copy link

Hey Alan!

I've been diving pretty deep into the dispatcher code while reviewing this PR (#121), and I ended up contributing a bunch of changes I thought might help us all. Since I'm still learning some of these concurrency and messaging patterns, I used this as an opportunity to explore new concepts. I tested everything as much as I could, though in a few places (like subtle race conditions) the tests weren’t quite as helpful as I’d hoped. But I’m definitely open if you want to cherry-pick only the changes you like. Here’s a rundown of what I did:

  1. manage_old_workers Two-Phase Locking (Potentially closes Worker exits processed incorrectly if shutdown after scale-down events ansible/dispatcherd#124)

    • I refactored the worker-removal logic to avoid race conditions that can arise when iterating over self.workers. The old approach either did everything without a lock or held the lock for the entire operation, which risked either inconsistent snapshots or blocking other tasks for too long.
    • The new strategy is a two-phase process. First, we quickly acquire the lock and take a snapshot of the current workers, then immediately release the lock. While we have that snapshot, we do the heavier work like await worker.stop(). Finally, we re-acquire the lock to remove any workers that have been marked for deletion.
    • This avoids constantly locking the shared worker collection (which can block other methods like task dispatching), but still ensures no new references are added to workers we’re trying to remove. In practice, it makes shutdown a lot more robust while keeping concurrency overhead in check.
  2. pg_notify Changes: ConnectionSaver Thread Safety & Closed Connection Fixes

    • I initially introduced a threading.Lock in ConnectionSaver to ensure multiple threads don’t simultaneously create a new connection. This was important for concurrency so that only one connection is ever made at a time.
    • While testing with ./run_demo.py, I ran into issues where the demo sometimes tried to use a connection that had already been closed. To fix that, I added a check (connection.closed != 0) so that if the connection is no longer valid, we recreate it before handing it out again. That way, subsequent calls always get a live connection and the demo runs without errors.
    • I added tests that verify we only create one connection across multiple threads, although reproducing every edge case can be tricky in a local environment. Nonetheless, this change should keep the dispatcher from hitting race conditions or attempting to reuse a closed connection.
  3. Resource Cleanup in Control Module

    • I noticed that when we create a new broker in acontrol_with_reply, acontrol, control_with_reply, and control, we never explicitly closed the connections. This could lead to resource leaks.
    • I added try/finally blocks (or their equivalent) so that each ephemeral broker is properly .close()-d or .aclose()-d. That way, we don’t leave open connections lying around.
    • I wrote tests to confirm that cleanup is called, but real concurrency scenarios are obviously more complicated.
  4. NextWakeupRunner Error Propagation

    • The method process_wakeups swallowed any exception from process_object, which made debugging tough. I wrapped it in a try/except, logged the error, and re-raised it.
    • I also added a couple simple tests to make sure normal usage is still fine, plus a test that triggers the error path to confirm the exception is indeed propagated.
  5. Removing Redundant Lock in WorkerPool.dispatch_task

    • Since Blocker and Queuer either do their own synchronization or rely on the management_lock in a consistent way, we were double-locking in dispatch_task. I refactored it so we only lock for the part that actually modifies worker state (like starting a task).
    • My hope is that this reduces lock contention without compromising correctness.
  6. ProcessProxy Context Manager & Type Hints

    • Implemented __enter__ and __exit__, so we can manage worker processes with a with statement.
    • I also added some type annotations so that any tricky usage or mypy checks (that's a bit of a hell) are a bit clearer. If a process is still alive on exit, we gracefully terminate or kill it.
  7. Smaller Tweaks

    • Renamed get_send_message to create_message in Control to better reflect it’s constructing something new.
    • Replaced a .format() call with an f-string to keep the code style consistent.

Throughout these changes, I tried to test everything thoroughly. Some concurrency aspects are still tough to fully validate in a purely local environment (like race conditions that might only appear under big loads), but hopefully the approach is sound. If you prefer smaller bites, you can definitely pick the parts you want (like just the ConnectionSaver fix or the manage_old_workers refactor) and leave anything you’re unsure about.

Tests are green, ./run_demo.py is working.

Anyway, let me know what you think. I'd really like to learn more together 👍

- Add JSON parsing with exception handling to ignore malformed messages.
- Log warnings when invalid JSON is received.
- Add a unit test
- Change method name in Control class to better reflect its role in constructing messages.
- Ensure broker connections are closed in acontrol, control_with_reply, and control.
- Update tests to verify that aclose() or close() is called appropriately.
- Refactor manage_old_workers to use a two-phase locking approach.
- Take a snapshot under lock, process removals, then re-acquire the lock to remove workers atomically.

Potentially closes ansible#124
- Wrap process_object callback in try/except to log and re-raise errors.
- Add unit tests to verify normal operation and error propagation.
…orrectness

-- Squashed --

Fix ConnectionSaver caching and type issues for closed connections

- Update get_connection and aget_connection to check if the cached connection is closed (i.e. .closed != 0) and reinitialize it if so, ensuring that run_demo.py and other users always receive a live connection.
- Add type assertions to guarantee that a valid (non-None) connection is returned, resolving mypy errors.

Add thread safety to ConnectionSaver in pg_notify.py and add tests

- Introduce a threading.Lock in ConnectionSaver to protect _connection and _async_connection.
- Wrap the initialization in connection_saver and async_connection_saver with the lock to avoid race conditions.
- Update tests to verify that concurrent access creates only one connection.

Note: We use a standard threading.Lock because this is protecting shared state across threads.
- Refactor dispatch_task to avoid holding workers.management_lock for the entire operation.
- Blocker and Queuer functions are expected to be used within the WorkerPool context, so extra locking is unnecessary.
- Implement __enter__ and __exit__ with proper type annotations.
- __exit__ ensures that a running process is terminated (or killed) and joined. It returns Optional[bool] and ensures proper process cleanup.
Replace .format() with f-string for improved readability
in control-and-reply log message.
"Return existing connection or create a new one"
if not self._async_connection:
# Check if the cached async connection is either None or closed.
if not self._async_connection or getattr(self._async_connection, "closed", 0) != 0:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's got to be a story behind getattr(self._async_connection, "closed", 0) here, and I'd like to hear it!

message = json.loads(payload) if isinstance(payload, str) else payload
self.received_replies.append(message)
except json.JSONDecodeError as e:
logger.warning(f"Invalid JSON on channel '{channel}': {payload[:100]}... (Error: {e})")
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be more localized to do this same thing inside of parse_replies. If you wanted to include the channel information, that wouldn't be completely unreasonable to pass through to that method, but I think channel is low-quality information. There are not going to be multiple channels involved, so we shouldn't care to log it.

@AlanCoding AlanCoding merged commit 2de74c1 into AlanCoding:more_type_hints Mar 14, 2025
7 checks passed
AlanCoding pushed a commit that referenced this pull request Mar 14, 2025
…, Error Handling, and More (#1)

* Improve protocol documentation by adding docstrings

* Improve error handling in BrokerCallbacks.listen_for_replies

- Add JSON parsing with exception handling to ignore malformed messages.
- Log warnings when invalid JSON is received.
- Add a unit test

* Rename get_send_message to create_message for clarity

- Change method name in Control class to better reflect its role in constructing messages.

* Add resource cleanup in Control methods and tests

- Ensure broker connections are closed in acontrol, control_with_reply, and control.
- Update tests to verify that aclose() or close() is called appropriately.

* Fix race condition in manage_old_workers and add tests

- Refactor manage_old_workers to use a two-phase locking approach.
- Take a snapshot under lock, process removals, then re-acquire the lock to remove workers atomically.

Potentially closes ansible#124

* Improve error propagation in NextWakeupRunner.process_wakeups

- Wrap process_object callback in try/except to log and re-raise errors.
- Add unit tests to verify normal operation and error propagation.

* pg_notify: Improve ConnectionSaver caching, thread safety, and type correctness

-- Squashed --

Fix ConnectionSaver caching and type issues for closed connections

- Update get_connection and aget_connection to check if the cached connection is closed (i.e. .closed != 0) and reinitialize it if so, ensuring that run_demo.py and other users always receive a live connection.
- Add type assertions to guarantee that a valid (non-None) connection is returned, resolving mypy errors.

Add thread safety to ConnectionSaver in pg_notify.py and add tests

- Introduce a threading.Lock in ConnectionSaver to protect _connection and _async_connection.
- Wrap the initialization in connection_saver and async_connection_saver with the lock to avoid race conditions.
- Update tests to verify that concurrent access creates only one connection.

Note: We use a standard threading.Lock because this is protecting shared state across threads.

* Remove redundant lock in WorkerPool.dispatch_task

- Refactor dispatch_task to avoid holding workers.management_lock for the entire operation.
- Blocker and Queuer functions are expected to be used within the WorkerPool context, so extra locking is unnecessary.

* Add type annotations to context manager methods in ProcessProxy

- Implement __enter__ and __exit__ with proper type annotations.
- __exit__ ensures that a running process is terminated (or killed) and joined. It returns Optional[bool] and ensures proper process cleanup.

* Use f-string in control.py log message

Replace .format() with f-string for improved readability
in control-and-reply log message.
@AlanCoding
Copy link
Owner

To fix that, I added a check (connection.closed != 0) so that if the connection is no longer valid, we recreate it before handing it out again. That way, subsequent calls always get a live connection and the demo runs without errors.

I added some debugging, and now I don't think this is technically okay, but only because:

$ python
Python 3.12.8 (main, Dec  6 2024, 00:00:00) [GCC 14.2.1 20240912 (Red Hat 14.2.1-3)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> False == 0
True

This was not obvious to me. I don't love it.

AlanCoding pushed a commit that referenced this pull request Mar 18, 2025
…, Error Handling, and More (#1)

* Improve protocol documentation by adding docstrings

* Improve error handling in BrokerCallbacks.listen_for_replies

- Add JSON parsing with exception handling to ignore malformed messages.
- Log warnings when invalid JSON is received.
- Add a unit test

* Rename get_send_message to create_message for clarity

- Change method name in Control class to better reflect its role in constructing messages.

* Add resource cleanup in Control methods and tests

- Ensure broker connections are closed in acontrol, control_with_reply, and control.
- Update tests to verify that aclose() or close() is called appropriately.

* Fix race condition in manage_old_workers and add tests

- Refactor manage_old_workers to use a two-phase locking approach.
- Take a snapshot under lock, process removals, then re-acquire the lock to remove workers atomically.

Potentially closes ansible#124

* Improve error propagation in NextWakeupRunner.process_wakeups

- Wrap process_object callback in try/except to log and re-raise errors.
- Add unit tests to verify normal operation and error propagation.

* pg_notify: Improve ConnectionSaver caching, thread safety, and type correctness

-- Squashed --

Fix ConnectionSaver caching and type issues for closed connections

- Update get_connection and aget_connection to check if the cached connection is closed (i.e. .closed != 0) and reinitialize it if so, ensuring that run_demo.py and other users always receive a live connection.
- Add type assertions to guarantee that a valid (non-None) connection is returned, resolving mypy errors.

Add thread safety to ConnectionSaver in pg_notify.py and add tests

- Introduce a threading.Lock in ConnectionSaver to protect _connection and _async_connection.
- Wrap the initialization in connection_saver and async_connection_saver with the lock to avoid race conditions.
- Update tests to verify that concurrent access creates only one connection.

Note: We use a standard threading.Lock because this is protecting shared state across threads.

* Remove redundant lock in WorkerPool.dispatch_task

- Refactor dispatch_task to avoid holding workers.management_lock for the entire operation.
- Blocker and Queuer functions are expected to be used within the WorkerPool context, so extra locking is unnecessary.

* Add type annotations to context manager methods in ProcessProxy

- Implement __enter__ and __exit__ with proper type annotations.
- __exit__ ensures that a running process is terminated (or killed) and joined. It returns Optional[bool] and ensures proper process cleanup.

* Use f-string in control.py log message

Replace .format() with f-string for improved readability
in control-and-reply log message.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants