diff --git a/sphinx/builders/__init__.py b/sphinx/builders/__init__.py index 2dd972ecfe0..e58242ed9db 100644 --- a/sphinx/builders/__init__.py +++ b/sphinx/builders/__init__.py @@ -3,7 +3,9 @@ from __future__ import annotations import codecs +import multiprocessing import pickle +import queue import re import time from contextlib import nullcontext @@ -590,39 +592,60 @@ def _read_serial(self, docnames: list[str]) -> None: self.read_doc(docname) def _read_parallel(self, docnames: list[str], nproc: int) -> None: - chunks = make_chunks(docnames, nproc) + # status iterator is currently not very helpful, it notifies when workers join back. + # in theory there could be a status update queue that runs in the main thread. # create a status_iterator to step progressbar after reading a document # (see: ``merge()`` function) progress = status_iterator( - chunks, - __('reading sources... '), + range(nproc), + __('Worker returned.. '), 'purple', - len(chunks), + nproc, self.config.verbosity, ) - # clear all outdated docs at once + # clear all outdated docs at once so that forked environments are cleared for docname in docnames: self.events.emit('env-purge-doc', self.env, docname) self.env.clear_doc(docname) - def read_process(docs: list[str]) -> bytes: - self.env._app = self._app - for docname in docs: - self.read_doc(docname, _cache=False) - # allow pickling self to send it back - return pickle.dumps(self.env, pickle.HIGHEST_PROTOCOL) + work_queue: multiprocessing.Queue[str | None] = multiprocessing.Queue() + + # load up all docs to be processed + for doc in docnames: + work_queue.put(doc) - def merge(docs: list[str], otherenv: bytes) -> None: - env = pickle.loads(otherenv) - self.env.merge_info_from(docs, env, self._app) + # Add sentinel values to stop workers after all tasks are done + # Not if there is some edge case where this is not reliable enough + for _ in range(nproc): + work_queue.put(None) + def read_worker(work_queue: multiprocessing.Queue[str]) -> bytes: + self.env._app = self._app + processed_docs = [] + while True: + try: + task = work_queue.get(timeout=1) # timeout to allow graceful exit + except queue.Empty: + break + if task is None: + break + self.read_doc(task, _cache=False) + processed_docs.append(task) + # Return both processed docs and environment + return pickle.dumps((processed_docs, self.env), pickle.HIGHEST_PROTOCOL) + + def merge(work_queue: multiprocessing.Queue[str], results: bytes) -> None: + processed_docs, env = pickle.loads(results) + # Only merge the documents that this worker actually processed + self.env.merge_info_from(processed_docs, env, self._app) next(progress) + # Spawn nproc workers to work through queue tasks = ParallelTasks(nproc) - for chunk in chunks: - tasks.add_task(read_process, chunk, merge) + for _ in range(nproc): + tasks.add_task(read_worker, work_queue, merge) # make sure all threads have finished tasks.join()