Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow using forkserver #78

Merged
merged 3 commits into from
Feb 27, 2025
Merged

Allow using forkserver #78

merged 3 commits into from
Feb 27, 2025

Conversation

AlanCoding
Copy link
Member

@AlanCoding AlanCoding commented Feb 15, 2025

Fixes #39

Fixes #40

I had another idea to do a callback hook that would set up Django, and employ a hazmat.py module in dispatcher. But I've abandoned that, because it's very non-obvious and adds muddies the contract with the app. If it's important, the app can set up such a module itself and pass it in the parameter, which is added here.

This isn't yet coherent to configure, and that will be delayed until #64 gets in. It's a bit hard because it's nested down low to:

  • DispatcherMain
    • WorkerPool
      • ProcessManager

So that's 3 levels deep, and that's something to be cleaned up with the factories logic.

@AlanCoding
Copy link
Member Author

Right now this looks like a legitimate failure

_________________________ test_cancel_task[forkserver] _________________________

apg_dispatcher = <dispatcher.main.DispatcherMain object at 0x7f9e6e318f50>
pg_message = <function pg_message.<locals>._rf at 0x7f9e6e320220>
pg_control = <dispatcher.control.Control object at 0x7f9e6e9dd480>

    @pytest.mark.asyncio
    async def test_cancel_task(apg_dispatcher, pg_message, pg_control):
        msg = json.dumps({'task': 'lambda: __import__("time").sleep(3.1415)', 'uuid': 'foobar'})
        await pg_message(msg)
    
        clearing_task = asyncio.create_task(apg_dispatcher.pool.events.work_cleared.wait())
        await asyncio.sleep(0.04)
        canceled_jobs = await asyncio.wait_for(pg_control.acontrol_with_reply('cancel', data={'uuid': 'foobar'}, timeout=1), timeout=5)
        worker_id, canceled_message = canceled_jobs[0][0]
        assert canceled_message['uuid'] == 'foobar'
        await asyncio.wait_for(clearing_task, timeout=3)
    
        pool = apg_dispatcher.pool
>       assert [pool.finished_count, pool.canceled_count, pool.control_count] == [0, 1, 1], 'cts: [finished, canceled, control]'
E       AssertionError: cts: [finished, canceled, control]
E       assert [0, 0, 1] == [0, 1, 1]
E         
E         At index 1 diff: 0 != 1
E         
E         Full diff:
E           [
E               0,
E         -     1,
E         ?     ^
E         +     0,
E         ?     ^
E               1,
E           ]

And this just leads to questions about whether we have the right pid, and if there's any nuance related to that.

@AlanCoding
Copy link
Member Author

I seem to be able to reproduce the failure locally (although it is hard, does not consistently happen) with:

diff --git a/tests/integration/test_main.py b/tests/integration/test_main.py
index 9b2a137..3f7ad7a 100644
--- a/tests/integration/test_main.py
+++ b/tests/integration/test_main.py
@@ -83,7 +83,7 @@ async def test_cancel_task(apg_dispatcher, pg_message, pg_control):
     await pg_message(msg)
 
     clearing_task = asyncio.create_task(apg_dispatcher.pool.events.work_cleared.wait())
-    await asyncio.sleep(0.04)
+    await asyncio.sleep(0.0)
     canceled_jobs = await asyncio.wait_for(pg_control.acontrol_with_reply('cancel', data={'uuid': 'foobar'}, timeout=1), timeout=5)
     worker_id, canceled_message = canceled_jobs[0][0]
     assert canceled_message['uuid'] == 'foobar'

After the config work is done, it might be best for the task to send a pg_notify message when it starts so that we have a signaling mechanism to make the test fully deterministic.

@AlanCoding
Copy link
Member Author

Confirmed that running

py.test tests/integration/test_main.py -k test_cancel_task

results in

/usr/lib64/python3.12/multiprocessing/popen_fork.py:66: DeprecationWarning: This process (pid=248636) is multi-threaded, use of fork() may lead to deadlocks in the child.
self.pid = os.fork()

However, running

py.test tests/integration/test_main.py -k "test_cancel_task and forkserver"

Only running the forkserver version of the test, does not hit this warning. So I will link the issue for that. Ping @kdelee FYI

@AlanCoding AlanCoding requested a review from kdelee February 15, 2025 19:40
@AlanCoding
Copy link
Member Author

This should now be almost ready except for the mypy linter.

@AlanCoding AlanCoding marked this pull request as ready for review February 21, 2025 20:34
@AlanCoding
Copy link
Member Author

I was considering adding janus to this as a 3rd option, see #3. That did not work before, and I hoped forserver might allow it to work. It did not. So, I will not add that.

That's the only other thing I was considering adding to this, and I'm not going to. So I'm finished with it now.

Copy link
Collaborator

@art-tapin art-tapin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tested the new ForkServerManager in a standalone environment (not AWX) and the results are great. Here’s a summary of what I did and what I observed:

  • Two Configs, One Script:

    • I created config_fork.yml (using ProcessManager) and config_forkserver.yml (using ForkServerManager) to compare file-descriptor usage side by side.
    • Submitted ~50 tasks (15-second sleeps each) using submit_many_tasks.py and measured FDs via a shell script (compare_fd.sh) plus pid-host-to-container.py with a loosened regex (e.g. (dispatcher-standalone|ForkServerManager|python.*forkserver)).
  • Dispatcher FD Test Files: forkserver-vs-fork

  • Observed FD Usage:

    • Under fork, the average FD count ended up around 67.
    • Under forkserver, the average FD dropped dramatically to about 15.
    • Tasks (including delayed ones) and cancellations worked fine in both scenarios. The “task canceled” tracebacks during shutdown are expected, as far as I can tell, they show the dispatcher is cleanly stopping workers mid-task.
  • Docs & Practical Tips:

    • The README.md and related docs match what I did, though two small points proved crucial in testing:
      1. Consistent Config: Make sure the same config file is used for both the dispatcher and any publisher script -- otherwise you get empty test results.
      2. Regex Patterns: Choose your pid-host-to-container pattern carefully. If it’s too narrow, you’ll miss worker processes; too broad, you’ll pick up old leftover processes.

Approved 🥇

Run main tests with forkserver

Add tests about pid accuracy

Get tests mostly working

Run linters and close connections

Wait for workers to be ready before starting test

wrap up linters

Update schema

run linters

Python 3.10 compat
@AlanCoding
Copy link
Member Author

The “task canceled” tracebacks during shutdown are expected, as far as I can tell, they show the dispatcher is cleanly stopping workers mid-task.

Yeah, maybe we could do some aesthetic re-branding, but having the stack trace generally is super important. Because tasks that time out or remain running until service shutdown are likely hanging, and that stack trace can give the line of code where they are hanging.

Submitted ~50 tasks (15-second sleeps each) using submit_many_tasks.py and measured FDs via a shell script (compare_fd.sh) plus pid-host-to-container.py with a loosened regex (e.g. (dispatcher-standalone|ForkServerManager|python.*forkserver)).

Awesome, thanks. I'd like to get this number as a self-diagnostic, doing #92, I have an eye to a pattern where this can go (in this case we start with the worker pid).

@AlanCoding AlanCoding merged commit 0d0e309 into ansible:main Feb 27, 2025
7 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants