Skip to content

Update _read_parallel to use a queue #13827

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 39 additions & 16 deletions sphinx/builders/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
from __future__ import annotations

import codecs
import multiprocessing
import pickle
import queue
import re
import time
from contextlib import nullcontext
Expand Down Expand Up @@ -590,39 +592,60 @@ def _read_serial(self, docnames: list[str]) -> None:
self.read_doc(docname)

def _read_parallel(self, docnames: list[str], nproc: int) -> None:
chunks = make_chunks(docnames, nproc)
# status iterator is currently not very helpful, it notifies when workers join back.
# in theory there could be a status update queue that runs in the main thread.

# create a status_iterator to step progressbar after reading a document
# (see: ``merge()`` function)
progress = status_iterator(
chunks,
__('reading sources... '),
range(nproc),
__('Worker returned.. '),
'purple',
len(chunks),
nproc,
self.config.verbosity,
)

# clear all outdated docs at once
# clear all outdated docs at once so that forked environments are cleared
for docname in docnames:
self.events.emit('env-purge-doc', self.env, docname)
self.env.clear_doc(docname)

def read_process(docs: list[str]) -> bytes:
self.env._app = self._app
for docname in docs:
self.read_doc(docname, _cache=False)
# allow pickling self to send it back
return pickle.dumps(self.env, pickle.HIGHEST_PROTOCOL)
work_queue: multiprocessing.Queue[str | None] = multiprocessing.Queue()

# load up all docs to be processed
for doc in docnames:
work_queue.put(doc)

def merge(docs: list[str], otherenv: bytes) -> None:
env = pickle.loads(otherenv)
self.env.merge_info_from(docs, env, self._app)
# Add sentinel values to stop workers after all tasks are done
# Not if there is some edge case where this is not reliable enough
for _ in range(nproc):
work_queue.put(None)

def read_worker(work_queue: multiprocessing.Queue[str]) -> bytes:
self.env._app = self._app
processed_docs = []
while True:
try:
task = work_queue.get(timeout=1) # timeout to allow graceful exit
except queue.Empty:
break
if task is None:
break
self.read_doc(task, _cache=False)
processed_docs.append(task)
# Return both processed docs and environment
return pickle.dumps((processed_docs, self.env), pickle.HIGHEST_PROTOCOL)

def merge(work_queue: multiprocessing.Queue[str], results: bytes) -> None:
processed_docs, env = pickle.loads(results)
# Only merge the documents that this worker actually processed
self.env.merge_info_from(processed_docs, env, self._app)
next(progress)

# Spawn nproc workers to work through queue
tasks = ParallelTasks(nproc)
for chunk in chunks:
tasks.add_task(read_process, chunk, merge)
for _ in range(nproc):
tasks.add_task(read_worker, work_queue, merge)

# make sure all threads have finished
tasks.join()
Expand Down
Loading