Skip to content
Open
Changes from 7 commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
3cc5c5f
fix: replace downloads TaskGroup with asyncio.Queue for task decoupling
Barbarella6666666 Apr 22, 2026
25f838f
feat: add download dispatcher that creates independent tasks from queue
Barbarella6666666 Apr 22, 2026
d41ba2c
refactor: wire dispatcher into __call__ and remove downloads TaskGroup
Barbarella6666666 Apr 22, 2026
508b8da
fix: run scrape-done UI callback as standalone task
Barbarella6666666 Apr 22, 2026
83c6d81
Merge remote-tracking branch 'other-repo/fix/immediate-downloads' int…
Barbarella6666666 Apr 24, 2026
193c3e6
fix: RUF006 Store a reference to the return value of `asyncio.create_…
Barbarella6666666 Apr 24, 2026
17fcdf8
refactor: fix ruff formatting
Barbarella6666666 Apr 24, 2026
798d1e6
refactor: add smoke test
NTFSvolume Apr 25, 2026
80b503e
fix: use _done event in dispatcher loop and TaskGroup for shutdown
Barbarella6666666 Apr 27, 2026
5c7285b
refactor: move hide_scrape_panel into _download_dispatcher
Barbarella6666666 Apr 27, 2026
a0e29f3
feat: enable eager task factory to prevent download starvation
Barbarella6666666 Apr 27, 2026
7b7d895
fix: handle cancellation and Ctrl+C in download dispatcher
Barbarella6666666 Apr 27, 2026
90a0282
fix: replace broken TaskGroup+shield drain with asyncio.gather
Barbarella6666666 Apr 27, 2026
5a5986c
fix: remove fake download that intercepted real downloads
Barbarella6666666 Apr 27, 2026
1c341a3
fix: handle Ctrl+C by cancelling dispatcher instead of draining
Barbarella6666666 Apr 27, 2026
c344d65
refactor: replace eager_task_factory with worker pool for downloads
Barbarella6666666 Apr 27, 2026
12cfef7
fix: include pending queue in download queue count
Barbarella6666666 Apr 27, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 25 additions & 8 deletions cyberdrop_dl/scrape_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ def update(self, item: ScrapeItem) -> None:
@dataclasses.dataclass(slots=True)
class TaskGroups:
scrape: asyncio.TaskGroup
downloads: asyncio.TaskGroup


@dataclasses.dataclass(slots=True)
Expand All @@ -118,14 +117,15 @@ class ScrapeMapper:
_direct_http: DirectHttpFile = dataclasses.field(init=False)
_jdownloader: JDownloader = dataclasses.field(init=False)
_real_debrid: RealDebridCrawler = dataclasses.field(init=False)
_task_groups: TaskGroups = dataclasses.field(
init=False, default_factory=lambda: TaskGroups(asyncio.TaskGroup(), asyncio.TaskGroup())
)
_task_groups: TaskGroups = dataclasses.field(init=False, default_factory=lambda: TaskGroups(asyncio.TaskGroup()))
_seen_urls: set[AbsoluteHttpURL] = dataclasses.field(init=False, default_factory=set)
_crawlers_disabled_at_runtime: set[str] = dataclasses.field(init=False, default_factory=set)
_factory: CrawlerFactory = dataclasses.field(init=False)
tui: ScrapingUI = dataclasses.field(init=False, default_factory=ScrapingUI)
_done: asyncio.Event = dataclasses.field(init=False, default_factory=asyncio.Event)
_pending_downloads: asyncio.Queue[Coroutine[Any, Any, Any] | None] = dataclasses.field(
init=False, default_factory=asyncio.Queue
)

def _scrape_queue(self) -> int:
return sum(f.waiting_items for f in self._factory)
Expand All @@ -147,7 +147,7 @@ def create_task(self, coro: Coroutine[Any, Any, _T]) -> None:
_ = self._task_groups.scrape.create_task(coro)

def create_download_task(self, coro: Coroutine[Any, Any, _T]) -> None:
_ = self._task_groups.downloads.create_task(coro)
self._pending_downloads.put_nowait(coro)

def _init_crawlers(self) -> None:

Expand All @@ -160,6 +160,20 @@ def _init_crawlers(self) -> None:

plugins.load(self.manager)

async def _download_dispatcher(self) -> None:
active: set[asyncio.Task[None]] = set()

while True:
coro = await self._pending_downloads.get()
if coro is None:
break
task = asyncio.create_task(coro)
active.add(task)
task.add_done_callback(active.discard)

if active:
await asyncio.gather(*active, return_exceptions=True)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should wait on done event. We can actually use a taskgroup

Suggested change
while True:
coro = await self._pending_downloads.get()
if coro is None:
break
task = asyncio.create_task(coro)
active.add(task)
task.add_done_callback(active.discard)
if active:
await asyncio.gather(*active, return_exceptions=True)
while not self._done.is_set():
coro = await self._pending_downloads.get()
if coro is None:
break
task = asyncio.create_task(coro)
active.add(task)
task.add_done_callback(active.discard)
if active:
async with asyncio.TaskGroup() as tg:
for pending in active:
tg.create_task(pending)


@contextlib.asynccontextmanager
async def __call__(self) -> AsyncGenerator[Self]:
assert not self._done.is_set()
Expand All @@ -174,17 +188,18 @@ async def __call__(self) -> AsyncGenerator[Self]:
self.manager.client_manager,
storage.monitor(self.manager.config.global_settings.general.required_free_space),
self.manager.logs.task_group,
self._task_groups.downloads,
):
dispatcher = asyncio.create_task(self._download_dispatcher())
try:
async with self._task_groups.scrape:
self.manager.scrape_mapper = self

yield self

finally:
# The done event signals that all scraping is done, but there may still be downloads pending
self._done.set()
await self._pending_downloads.put(None)
await dispatcher

async def run(self) -> ScrapeStats:
self._init_crawlers()
Expand All @@ -203,6 +218,7 @@ async def run(self) -> ScrapeStats:
source_name, source = _source(self.manager)
async with contextlib.aclosing(source) as items:
stats = ScrapeStats(source_name)
background_tasks = set()

async def wait_until_scrape_is_done() -> None:
_ = await self._done.wait()
Expand All @@ -211,7 +227,8 @@ async def wait_until_scrape_is_done() -> None:
(crawler.DOMAIN, count) for crawler in self._factory if (count := len(crawler._scraped_items))
)

self.create_download_task(wait_until_scrape_is_done())
task = asyncio.create_task(wait_until_scrape_is_done())
background_tasks.add(task)

async for item in items:
item.children_limits = self.manager.config.settings.download_options.maximum_number_of_children
Expand Down
Loading