Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
3cc5c5f
fix: replace downloads TaskGroup with asyncio.Queue for task decoupling
Barbarella6666666 Apr 22, 2026
25f838f
feat: add download dispatcher that creates independent tasks from queue
Barbarella6666666 Apr 22, 2026
d41ba2c
refactor: wire dispatcher into __call__ and remove downloads TaskGroup
Barbarella6666666 Apr 22, 2026
508b8da
fix: run scrape-done UI callback as standalone task
Barbarella6666666 Apr 22, 2026
83c6d81
Merge remote-tracking branch 'other-repo/fix/immediate-downloads' int…
Barbarella6666666 Apr 24, 2026
193c3e6
fix: RUF006 Store a reference to the return value of `asyncio.create_…
Barbarella6666666 Apr 24, 2026
17fcdf8
refactor: fix ruff formatting
Barbarella6666666 Apr 24, 2026
798d1e6
refactor: add smoke test
NTFSvolume Apr 25, 2026
80b503e
fix: use _done event in dispatcher loop and TaskGroup for shutdown
Barbarella6666666 Apr 27, 2026
5c7285b
refactor: move hide_scrape_panel into _download_dispatcher
Barbarella6666666 Apr 27, 2026
a0e29f3
feat: enable eager task factory to prevent download starvation
Barbarella6666666 Apr 27, 2026
7b7d895
fix: handle cancellation and Ctrl+C in download dispatcher
Barbarella6666666 Apr 27, 2026
90a0282
fix: replace broken TaskGroup+shield drain with asyncio.gather
Barbarella6666666 Apr 27, 2026
5a5986c
fix: remove fake download that intercepted real downloads
Barbarella6666666 Apr 27, 2026
1c341a3
fix: handle Ctrl+C by cancelling dispatcher instead of draining
Barbarella6666666 Apr 27, 2026
c344d65
refactor: replace eager_task_factory with worker pool for downloads
Barbarella6666666 Apr 27, 2026
12cfef7
fix: include pending queue in download queue count
Barbarella6666666 Apr 27, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cyberdrop_dl/crawlers/crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,7 @@ async def _download(self, media_item: MediaItem, m3u8: m3u8.Rendition | None) ->
try:
if SKIP_DOWNLOAD.get():
return

if m3u8:
await self.downloader.download_hls(media_item, m3u8)
else:
Expand Down
62 changes: 44 additions & 18 deletions cyberdrop_dl/scrape_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ def update(self, item: ScrapeItem) -> None:
@dataclasses.dataclass(slots=True)
class TaskGroups:
scrape: asyncio.TaskGroup
downloads: asyncio.TaskGroup


@dataclasses.dataclass(slots=True)
Expand All @@ -118,20 +117,21 @@ class ScrapeMapper:
_direct_http: DirectHttpFile = dataclasses.field(init=False)
_jdownloader: JDownloader = dataclasses.field(init=False)
_real_debrid: RealDebridCrawler = dataclasses.field(init=False)
_task_groups: TaskGroups = dataclasses.field(
init=False, default_factory=lambda: TaskGroups(asyncio.TaskGroup(), asyncio.TaskGroup())
)
_task_groups: TaskGroups = dataclasses.field(init=False, default_factory=lambda: TaskGroups(asyncio.TaskGroup()))
_seen_urls: set[AbsoluteHttpURL] = dataclasses.field(init=False, default_factory=set)
_crawlers_disabled_at_runtime: set[str] = dataclasses.field(init=False, default_factory=set)
_factory: CrawlerFactory = dataclasses.field(init=False)
tui: ScrapingUI = dataclasses.field(init=False, default_factory=ScrapingUI)
_done: asyncio.Event = dataclasses.field(init=False, default_factory=asyncio.Event)
_pending_downloads: asyncio.Queue[Coroutine[Any, Any, Any] | None] = dataclasses.field(
init=False, default_factory=asyncio.Queue
)

def _scrape_queue(self) -> int:
return sum(f.waiting_items for f in self._factory)

def _download_queue(self):
total = sum(f.downloader.waiting_items for f in self._factory)
total = sum(f.downloader.waiting_items for f in self._factory) + self._pending_downloads.qsize()
self.tui.files.stats.queued = total
return total

Expand All @@ -147,7 +147,7 @@ def create_task(self, coro: Coroutine[Any, Any, _T]) -> None:
_ = self._task_groups.scrape.create_task(coro)

def create_download_task(self, coro: Coroutine[Any, Any, _T]) -> None:
_ = self._task_groups.downloads.create_task(coro)
self._pending_downloads.put_nowait(coro)

def _init_crawlers(self) -> None:

Expand All @@ -160,6 +160,32 @@ def _init_crawlers(self) -> None:

plugins.load(self.manager)

async def _download_worker(self) -> None:
while True:
coro = await self._pending_downloads.get()
if coro is None:
break
await coro

async def _download_dispatcher(self) -> None:
workers: list[asyncio.Task[None]] = []
try:
max_workers = self.manager.config.global_settings.rate_limiting_options.max_simultaneous_downloads
workers = [asyncio.create_task(self._download_worker()) for _ in range(max_workers)]
await self._done.wait()
self.tui.hide_scrape_panel()
for _ in workers:
await self._pending_downloads.put(None)
await asyncio.gather(*workers)
except (asyncio.CancelledError, KeyboardInterrupt):
for task in workers:
task.cancel()
while not self._pending_downloads.empty():
coro = self._pending_downloads.get_nowait()
if coro is not None:
coro.close()
raise

@contextlib.asynccontextmanager
async def __call__(self) -> AsyncGenerator[Self]:
assert not self._done.is_set()
Expand All @@ -174,17 +200,22 @@ async def __call__(self) -> AsyncGenerator[Self]:
self.manager.client_manager,
storage.monitor(self.manager.config.global_settings.general.required_free_space),
self.manager.logs.task_group,
self._task_groups.downloads,
):
dispatcher = asyncio.create_task(self._download_dispatcher())
try:
async with self._task_groups.scrape:
self.manager.scrape_mapper = self

yield self

finally:
# The done event signals that all scraping is done, but there may still be downloads pending
except BaseException:
dispatcher.cancel()
with contextlib.suppress(asyncio.CancelledError):
await dispatcher
raise
else:
self._done.set()
await dispatcher

async def run(self) -> ScrapeStats:
self._init_crawlers()
Expand All @@ -204,15 +235,6 @@ async def run(self) -> ScrapeStats:
async with contextlib.aclosing(source) as items:
stats = ScrapeStats(source_name)

async def wait_until_scrape_is_done() -> None:
_ = await self._done.wait()
self.tui.hide_scrape_panel()
stats.url_count.update(
(crawler.DOMAIN, count) for crawler in self._factory if (count := len(crawler._scraped_items))
)

self.create_download_task(wait_until_scrape_is_done())

async for item in items:
item.children_limits = self.manager.config.settings.download_options.maximum_number_of_children
if self._should_scrape(item):
Expand All @@ -221,6 +243,10 @@ async def wait_until_scrape_is_done() -> None:
stats.update(item)
self.create_task(self._send_to_crawler(item))

stats.url_count.update(
(crawler.DOMAIN, count) for crawler in self._factory if (count := len(crawler._scraped_items))
)

if not stats.count:
logger.warning("No valid links found")

Expand Down
Loading