Skip to content

Add async task background worker #4591

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 22 commits into
base: srothh/worker-class-hierarchy
Choose a base branch
from

Conversation

srothh
Copy link
Member

@srothh srothh commented Jul 17, 2025

Add a new implementation of the transport background worker based on an async task. This worker mostly mirrors the same functionality as the thread-based worker, with the exception that it exposes a non-blocking async flush (which can be awaited from an async context). Furthermore, the worker itself is not thread-safe and should be called using run_coroutine_threadsafe or similar when called from another thread (this is fixed and handled by the transport). I have kept the fork check from the threaded worker, but I am not sure if it is necessary as forking in an async application would also break the event loop.

GH-4581

Copy link

codecov bot commented Jul 17, 2025

❌ 54 Tests Failed:

Tests completed Failed Passed Skipped
21089 54 21035 1098
View the top 3 failed test(s) by shortest run time
tests.integrations.huggingface_hub.test_huggingface_hub::test_bad_chat_completion
Stack Traces | 0.124s run time
.../integrations/huggingface_hub/test_huggingface_hub.py:149: in test_bad_chat_completion
    client.text_generation(prompt="hello")
sentry_sdk/integrations/huggingface_hub.py:84: in new_text_generation
    raise e from None
sentry_sdk/integrations/huggingface_hub.py:80: in new_text_generation
    res = f(*args, **kwargs)
.tox/py3.8-huggingface_hub-v0.30.2/lib/python3.8.../huggingface_hub/inference/_client.py:2351: in text_generation
    request_parameters = provider_helper.prepare_request(
.tox/py3.8-huggingface_hub-v0.30.2/lib/python3.8.../inference/_providers/_common.py:64: in prepare_request
    mapped_model = self._prepare_mapped_model(model)
.tox/py3.8-huggingface_hub-v0.30.2/lib/python3.8.../inference/_providers/hf_inference.py:35: in _prepare_mapped_model
    _check_supported_task(model_id, self.task)
.tox/py3.8-huggingface_hub-v0.30.2/lib/python3.8.../inference/_providers/hf_inference.py:164: in _check_supported_task
    raise ValueError(
E   ValueError: Model 'mistralai/Mistral-Nemo-Instruct-2407' doesn't support task 'text-generation'. Supported tasks: 'None', got: 'text-generation'
tests.integrations.huggingface_hub.test_huggingface_hub::test_bad_chat_completion
Stack Traces | 0.124s run time
.../integrations/huggingface_hub/test_huggingface_hub.py:149: in test_bad_chat_completion
    client.text_generation(prompt="hello")
sentry_sdk/integrations/huggingface_hub.py:84: in new_text_generation
    raise e from None
sentry_sdk/integrations/huggingface_hub.py:80: in new_text_generation
    res = f(*args, **kwargs)
          ^^^^^^^^^^^^^^^^^^
.tox/py3.12-huggingface_hub-v0.33.4/lib/python3.12.../huggingface_hub/inference/_client.py:2297: in text_generation
    request_parameters = provider_helper.prepare_request(
.tox/py3.12-huggingface_hub-v0.33.4/lib/python3.12.../inference/_providers/_common.py:93: in prepare_request
    provider_mapping_info = self._prepare_mapping_info(model)
                            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.tox/py3.12-huggingface_hub-v0.33.4/lib/python3.12.../inference/_providers/hf_inference.py:38: in _prepare_mapping_info
    _check_supported_task(model_id, self.task)
.tox/py3.12-huggingface_hub-v0.33.4/lib/python3.12.../inference/_providers/hf_inference.py:187: in _check_supported_task
    raise ValueError(
E   ValueError: Model 'mistralai/Mistral-Nemo-Instruct-2407' doesn't support task 'text-generation'. Supported tasks: 'None', got: 'text-generation'
tests.integrations.huggingface_hub.test_huggingface_hub::test_nonstreaming_chat_completion[False-True-False]
Stack Traces | 0.125s run time
.../integrations/huggingface_hub/test_huggingface_hub.py:56: in test_nonstreaming_chat_completion
    response = client.text_generation(
sentry_sdk/integrations/huggingface_hub.py:84: in new_text_generation
    raise e from None
sentry_sdk/integrations/huggingface_hub.py:80: in new_text_generation
    res = f(*args, **kwargs)
.tox/py3.8-huggingface_hub-v0.30.2/lib/python3.8.../huggingface_hub/inference/_client.py:2351: in text_generation
    request_parameters = provider_helper.prepare_request(
.tox/py3.8-huggingface_hub-v0.30.2/lib/python3.8.../inference/_providers/_common.py:64: in prepare_request
    mapped_model = self._prepare_mapped_model(model)
.tox/py3.8-huggingface_hub-v0.30.2/lib/python3.8.../inference/_providers/hf_inference.py:35: in _prepare_mapped_model
    _check_supported_task(model_id, self.task)
.tox/py3.8-huggingface_hub-v0.30.2/lib/python3.8.../inference/_providers/hf_inference.py:164: in _check_supported_task
    raise ValueError(
E   ValueError: Model 'mistralai/Mistral-Nemo-Instruct-2407' doesn't support task 'text-generation'. Supported tasks: 'None', got: 'text-generation'

To view more test analytics, go to the Test Analytics Dashboard
📋 Got 3 mins? Take this short survey to help us improve Test Analytics.

srothh added 7 commits July 21, 2025 11:44
…ted a sync transport HTTP subclass

Moved shared sync/async logic into a new superclass (HttpTransportCore), and moved sync transport specific code into a new subclass(BaseSyncHttpTransport), from which the current transport implementations inherit

Fixes GH-4568
Removed an unnecessary TODO message and reverted a class name change for BaseHTTPTransport.

GH-4568
Adds test coverage for the error handling path when HTTP requests return
error status codes.

GH-4568
Restore comments accidentally removed during a previous commit.
Refactored class names such that BaseHttpTransport now has the same functionality as before the hierarchy refactor

GH-4568
Add a new flush_async method in the Transport ABC. This is needed for the async transport, as calling it from the client
while preserving execution order in close will require flush to be a coroutine, not a function.

GH-4568
Move flush_async down to the specific async transport subclass. This makes more sense anyway, as
this will only be required by the async transport. If more async transports are expected,
another shared superclass can be created.

GH-4568
@srothh srothh force-pushed the srothh/worker-class-hierarchy branch from 1261319 to 2896602 Compare July 21, 2025 09:58
@srothh srothh force-pushed the srothh/async-task-worker branch from 7ada4b3 to 1a129f7 Compare July 21, 2025 10:42
srothh added 6 commits July 23, 2025 15:51
Add necessary type annotations to the core HttpTransport to accomodate for async transport.

GH-4568
Add an abstract bass class for implementation of the background worker. This was done to provide a shared interface for the current
implementation of a threaded worker in the sync context as well as the upcoming async task-based worker implementation.

GH-4578
Add a new factory method instead of direct instatiation of the threaded background worker.
This allows for easy extension to other types of workers, such as the upcoming task-based async worker.

GH-4578
Add a new flush_async method to worker ABC. This is necessary because the async transport cannot use a
synchronous blocking flush.

GH-4578
Move the flush_async down to the concrete subclass to not break existing testing. This makes sense,
as this will only really be needed by the async worker anyway and therefore is not shared logic.

GH-4578
Coroutines have a return value, however the current function signature for the worker methods does not
accomodate for this. Therefore, this signature was changed.

GH-4578
@srothh srothh force-pushed the srothh/worker-class-hierarchy branch from 2896602 to 268ea1a Compare July 23, 2025 14:02
srothh added 9 commits July 23, 2025 16:03
Add a new implementation of the worker interface, implementing the worker as an async task. This is
to be used by the upcoming async transport.

GH-4581
Refactor the flush method in the async worker to use the async_flush coroutine.

GH-4581
…unctions

Add a check to see wheter callbacks are awaitable coroutines or functions, as coroutines need to be awaited.

GH-4581
…coroutines

Coroutines do not return None, therefore it is necessary to consider this in the callback parameter of the worker. Previously,
only callbacks with return Type None were accepted.

GH-4581
Enable concurrent callbacks on async task worker by firing them as a task rather than awaiting them. A done callback handles the necessary queue and exception logic.

GH-4581
Changed kill to also use the _TERMINATOR sentinel, so the queue is still drained to this point on kill instead of cancelled immediately. This should also fix potential race conditions with flush_async.

GH-4581
Add proper type annotation to worker task list to fix linting problems

GH-4581
@srothh srothh force-pushed the srothh/async-task-worker branch from 1a129f7 to 97c5e3d Compare July 23, 2025 14:04
@srothh srothh marked this pull request as ready for review July 24, 2025 07:49
@srothh srothh requested a review from a team as a code owner July 24, 2025 07:49
Copy link
Member

@antonpirker antonpirker left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really great work. I have some comments for improvement.

Comment on lines +197 to +200
try:
self._queue.put_nowait(_TERMINATOR)
except asyncio.QueueFull:
logger.debug("async worker queue full, kill failed")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use the full() method from below? This way we would only have one way to check if the queue is full.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did it this way because the threaded worker does the same, but I think there should be no functional difference, so yes it is probably nicer!

pending = self._queue.qsize() + 1
logger.error("flush timed out, dropped %s events", pending)

async def flush_async(self, timeout: float, callback: Optional[Any] = None) -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this called flush_async and not flush? Shouldnt there be a flush() that maybe calls flush_async so all the workers can be used the same way?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is how I had it initially. However, when integrating this with the SDK there is an issue:

The client exposes a synchronous flush/close method, which in close expects flush to fully complete before shutting down the transport. If the worker/transport used the synchronous flush method, they could only create an async flush task (which is necessary to avoid deadlocking/use the async queue) in a fire-and-forget way. This means that there is not really a way to properly order the flush with the shutdown in the client, unless the client itself already spawns an async flush task using the transport.

I could rename it back to flush and keep it a coroutine, I just thought this way it is less confusing.


async def flush_async(self, timeout: float, callback: Optional[Any] = None) -> None:
logger.debug("background worker got flush request")
if self.is_alive and timeout > 0.0:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is also in the BackgroundWorker implementation, but do you know why a timeout of 0.0 should not flush the worker?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not really sure. But the client falls back to a "shutdown_timeout" option if the timeout is not set. Maybe setting shutdown_timeout to 0.0 is supposed to immediately quit the application on a call to close ?

# Create a strong reference to the task so it can be cancelled on kill
# and does not get garbage collected while running
self._active_tasks.add(task)
task.add_done_callback(self._on_task_complete)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should add the done callback before adding it to _active_tasks. (Just a gut feeling that the task might be finishing before the callback is in place..)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually oriented myself on the official documentation for the add_done_callback for this. However, if I understand it correctly I do not think it matters, as the task can only start once the event loop yields control from the current function, which should only happen on await/return. If it is more readable, I can change it however.

try:
await asyncio.wait_for(self._queue.join(), timeout=initial_timeout)
except asyncio.TimeoutError:
pending = self._queue.qsize() + 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because there could be multiple tasks in self._active_tasks maybe the more correct version would be self.queue.qsize() + len(self._active_tasks)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So pending should be not just the number still waiting in the queue, but the number of non-completed tasks in general ? I think this makes sense, thanks!

self._task = self._loop.create_task(self._target())
self._task_for_pid = os.getpid()
except RuntimeError:
# There is no event loop running
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should at least log a warning here, so we do not swallow failures silently.

await callback()
else:
# Callback is a sync function, need to call it
callback()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will block the event loop. Maybe we should do await asyncio.to_thread(callback) here? @sl0thentr0py what is your take on this? Blocking or creating threads?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just make a fully async version first, do we really need to mix sync and async? we can patch that on later if really necessary. Please keep it simple for now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After checking again, I agree that this is not needed. I confused this with the flush callbacks when I initially explored the SDK, which are synchronous functions. Those are however, processed in the flush method, and I believe for this there should only be the async requests currently. Thanks for catching that!


class AsyncWorker(Worker):
def __init__(self, queue_size: int = DEFAULT_QUEUE_SIZE) -> None:
self._queue: asyncio.Queue[Any] = asyncio.Queue(queue_size)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in older Pythons (3.7-3.9) it can be a problem if a Queue is initialized when there is no current event loop. I would not init it here, but in the start() after we got the event loop. something like:

# in start()
self._loop = asyncio.get_running_loop()
if self._queue is None:
    self._queue = asyncio.Queue(self._queue_size)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, I was not aware of this !

pending = self._queue.qsize() + 1
logger.debug("%d event(s) pending on flush", pending)
if callback is not None:
callback(pending, timeout)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will block the entire event loop. I guess the callback for the AsyncCorker should always be a coroutine that can be awaited.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I can tell, this functionality is used only by the atexit of the SDK, which from what I can tell only uses this with synchronous functions currently. If the blocking is a problem, run_in_executor could be used ? But as this only happens once on exit currently, I am not sure if this is an issue.

self._active_tasks.add(task)
task.add_done_callback(self._on_task_complete)
# Yield to let the event loop run other tasks
await asyncio.sleep(0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is not necessary, because the callback = await self._queue.get() in the loop also gives up control.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not 100% sure about this, but if there is instantly items in the queue, does the await also give up control? If not, it might have the same issue, but I am not sure

@srothh srothh force-pushed the srothh/worker-class-hierarchy branch from 1fbf85f to ef780f3 Compare July 28, 2025 08:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants