refactor: Refactor Download Management to Prevent Task Starvation#1713
refactor: Refactor Download Management to Prevent Task Starvation#1713Barbarella6666666 wants to merge 17 commits into
Conversation
Downloads were starved because the asyncio scheduler prioritized the many scraping tasks over download tasks in the same event loop. Introduce an asyncio.Queue as the handoff mechanism so download coroutines are dispatched independently of scraping pressure.
A single dispatcher task drains the asyncio.Queue and spawns each download as its own asyncio.Task. This preserves the original concurrency model where semaphores (server/domain/global) control parallelism, while ensuring downloads are no longer starved by scraping tasks.
Replace the downloads TaskGroup context manager with the new dispatcher task. On shutdown, a None sentinel is sent to the queue so the dispatcher drains remaining downloads before exiting. The TaskGroups dataclass no longer needs a downloads field.
wait_until_scrape_is_done is a UI notification, not a download. It was piggybacking on the downloads TaskGroup; now it runs as an independent asyncio.create_task so it does not occupy the download queue.
…o fix-immediate-downloads
|
Yeah, this is the approach I mentioned it on #1702. I didn't explain why but the reason I didn't what to use a queue is cause we lose all the error handling that the task group does for us on cancellation (which we never do, but the user could hit This will work though, so I'm not completely against it. I will do a proper review and give it a try later today We will need to add logic to cancel all tasks in the queue on error to get similar behavior to a taskgroup (Maybe use a taskgroup within?) to prevent the classic Ideally, we should use a queue of |
NTFSvolume
left a comment
There was a problem hiding this comment.
This is still not getting the desired behavior for me. A bunch of pages get scraped before downloads start. I think we need eager tasks for this to work.
Also, as I expected, it is swallowing any `CTRL + C I hit
| while True: | ||
| coro = await self._pending_downloads.get() | ||
| if coro is None: | ||
| break | ||
| task = asyncio.create_task(coro) | ||
| active.add(task) | ||
| task.add_done_callback(active.discard) | ||
|
|
||
| if active: | ||
| await asyncio.gather(*active, return_exceptions=True) |
There was a problem hiding this comment.
You should wait on done event. We can actually use a taskgroup
| while True: | |
| coro = await self._pending_downloads.get() | |
| if coro is None: | |
| break | |
| task = asyncio.create_task(coro) | |
| active.add(task) | |
| task.add_done_callback(active.discard) | |
| if active: | |
| await asyncio.gather(*active, return_exceptions=True) | |
| while not self._done.is_set(): | |
| coro = await self._pending_downloads.get() | |
| if coro is None: | |
| break | |
| task = asyncio.create_task(coro) | |
| active.add(task) | |
| task.add_done_callback(active.discard) | |
| if active: | |
| async with asyncio.TaskGroup() as tg: | |
| for pending in active: | |
| tg.create_task(pending) |
| @@ -211,7 +227,8 @@ async def wait_until_scrape_is_done() -> None: | |||
| (crawler.DOMAIN, count) for crawler in self._factory if (count := len(crawler._scraped_items)) | |||
| ) | |||
|
|
|||
| self.create_download_task(wait_until_scrape_is_done()) | |||
| task = asyncio.create_task(wait_until_scrape_is_done()) | |||
| background_tasks.add(task) | |||
There was a problem hiding this comment.
We can move the hide_scrape_panel call to _download_dispatcher
|
Added a commit so we can simulate downloads without actually downloading anything. I tested with |
Check self._done.is_set() in loop condition instead of while True. Drain remaining queued coroutines after loop exits. Use TaskGroup instead of asyncio.gather for final active tasks, restoring proper error propagation and Ctrl+C handling.
The dispatcher already knows when scraping is done (receives the None sentinel). Move the UI callback there instead of a standalone task. Collect url_count stats after the scrape loop in run().
On Python 3.12+, set asyncio.eager_task_factory on the event loop so that tasks created via asyncio.create_task() begin executing immediately instead of waiting for the next scheduler cycle. This is the key fix for downloads not starting while scraping is active. On Python 3.11, falls back to default behavior (no eager scheduling available).
On CancelledError or KeyboardInterrupt, cancel all active download tasks and close any unawaited coroutines still in the queue to avoid 'coroutine was never awaited' warnings.
asyncio.shield() returns a Future, not a coroutine, so tg.create_task(asyncio.shield(task)) raises TypeError. The active set contains Tasks already running, not coroutines, so we simply gather them directly.
The _fake_download method simulated download progress with a fixed 10GB size, preventing actual downloads from running. Removed the method, its call in _download, and the random import that was only used by it.
The finally block always drained all active downloads before exiting, effectively swallowing KeyboardInterrupt. Split into except/else/finally: on error or Ctrl+C, cancel the dispatcher and propagate immediately; on normal exit, drain the queue and wait for downloads to finish.
Instead of relying on eager_task_factory (Python 3.12+ only), spawn a fixed pool of download workers that consume from the queue. The number of workers is read from max_simultaneous_downloads config. This works on Python 3.11+ and respects the user's concurrency limit.
The download queue counter only counted items waiting on semaphores inside the downloader. Items still in the pending queue waiting for a worker were not counted, causing the UI to show the worker count instead of the actual queue size.
|
There was a problem hiding this comment.
We can't use workers either cause now the numbers of concurrent downloads will have the most relevance in the concurrency decision, when it should be the least relevant
For example, if we use config max of 10 concurrent downloads and the first 10 downloads CDL finds are from bunkr and they are all from the same bunkr CDN, CDL will only download 1 file at a time.
All the download workers will be blocked by the bunkr lock. Downloads from other sites won't start until all downloads from bunkr are finished.
|
This is the current logic to start downloads: cyberdrop-dl/cyberdrop_dl/downloader/downloader.py Lines 119 to 130 in d56554d The locks go from most restrictive (narrow scope) to least restrictive (wide scope): Putting all downloads on the same queue with a fixed number of workers will make downloads from different sites block each other |
|
I thinking of just dropping python 3.11 support. We can force eager tasks with 3.12+ and just let the event loop manage them as best as it can, instead of trying to fix this ourselfs. @jbsparrow, do think is OK if we drop p3.11? |
Honestly I think that this is the best solution. It's unfortunate to drop support for a Python version, but I think that our users are fine upgrading. We don't want to implement some hacky solution for a few versions when we will inevitably be upgrading eventually anyways. This solution is much simpler and easier for us. I think it should be implemented in v10.0, along with the database migration system. I will work on that more today and upload the branch. |
That is the best solution for sure. |
This is my take on trying to solve #1702
Replaces the TaskGroup in scrape_mapper.py with an unbounded asyncio. Queue and a background Task Dispatcher. This architectural shift ensures that file downloads are scheduled immediately, solving the starvation issues encountered when scraping high volumes of data.
Instead of batching downloads within a TaskGroup, we now pipe them into an internal queue. A dedicated dispatcher task consumes this queue in a loop, spawning a new independent asyncio.Task for every download.
Discarded Approaches