Skip to content

Use multiprocessing pool for paralelism #13781

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 25 additions & 25 deletions sphinx/builders/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import re
import time
from contextlib import nullcontext
from multiprocessing.pool import Pool
from pathlib import Path
from typing import TYPE_CHECKING, final

Expand All @@ -31,7 +32,7 @@
from sphinx.util.i18n import CatalogRepository, docname_to_domain
from sphinx.util.osutil import ensuredir, relative_uri, relpath
from sphinx.util.parallel import (
ParallelTasks,

Check failure on line 35 in sphinx/builders/__init__.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (F401)

sphinx/builders/__init__.py:35:5: F401 `sphinx.util.parallel.ParallelTasks` imported but unused; consider removing, adding to `__all__`, or using a redundant alias
SerialTasks,
make_chunks,
parallel_available,
Expand Down Expand Up @@ -607,25 +608,25 @@
self.events.emit('env-purge-doc', self.env, docname)
self.env.clear_doc(docname)

def read_process(docs: list[str]) -> bytes:
def read_process(docs: list[str]) -> tuple[list[str], bytes]:
self.env._app = self._app
for docname in docs:
self.read_doc(docname, _cache=False)
# allow pickling self to send it back
return pickle.dumps(self.env, pickle.HIGHEST_PROTOCOL)
return docs, pickle.dumps(self.env, pickle.HIGHEST_PROTOCOL)

def merge(docs: list[str], otherenv: bytes) -> None:
env = pickle.loads(otherenv)
self.env.merge_info_from(docs, env, self._app)

next(progress)
with Pool(processes=nproc) as pool:
# run read_process() in parallel
results = pool.imap(read_process, chunks)

tasks = ParallelTasks(nproc)
for chunk in chunks:
tasks.add_task(read_process, chunk, merge)
for (docs, bytes), _ in zip(results, progress):

Check failure on line 626 in sphinx/builders/__init__.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (B905)

sphinx/builders/__init__.py:626:37: B905 `zip()` without an explicit `strict=` parameter
# merge the results back into the main environment
merge(docs, bytes)

# make sure all threads have finished
tasks.join()
logger.info('')

@final
Expand Down Expand Up @@ -793,8 +794,17 @@
firstname, docnames = docnames[0], docnames[1:]
_write_docname(firstname, env=self.env, builder=self, tags=self.tags)

tasks = ParallelTasks(nproc)
chunks = make_chunks(docnames, nproc)

input_data = []

Check failure on line 798 in sphinx/builders/__init__.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (E303)

sphinx/builders/__init__.py:798:9: E303 Too many blank lines (2)
for docname in docnames:
doctree = self.env.get_and_resolve_doctree(
docname, self, tags=self.tags
)
self.write_doc_serialized(docname, doctree)
input_data.append((docname, doctree))


chunks = make_chunks(input_data, nproc)

Check failure on line 807 in sphinx/builders/__init__.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (E303)

sphinx/builders/__init__.py:807:9: E303 Too many blank lines (2)

# create a status_iterator to step progressbar after writing a document
# (see: ``on_chunk_done()`` function)
Expand All @@ -806,22 +816,12 @@
self.config.verbosity,
)

def on_chunk_done(args: list[tuple[str, nodes.document]], result: None) -> None:
next(progress)

self.phase = BuildPhase.RESOLVING
for chunk in chunks:
arg = []
for docname in chunk:
doctree = self.env.get_and_resolve_doctree(
docname, self, tags=self.tags
)
self.write_doc_serialized(docname, doctree)
arg.append((docname, doctree))
tasks.add_task(write_process, arg, on_chunk_done)
with Pool(processes=nproc) as pool:
result = pool.imap(write_process, chunks)
for _ in zip(result, progress):

Check failure on line 821 in sphinx/builders/__init__.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (B905)

sphinx/builders/__init__.py:821:22: B905 `zip()` without an explicit `strict=` parameter
# just step the progress bar
pass

# make sure all threads have finished
tasks.join()
logger.info('')

def prepare_writing(self, docnames: Set[str]) -> None:
Expand Down
6 changes: 1 addition & 5 deletions sphinx/util/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import os
import time
import traceback
from math import sqrt

Check failure on line 8 in sphinx/util/parallel.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (F401)

sphinx/util/parallel.py:8:18: F401 `math.sqrt` imported but unused
from typing import TYPE_CHECKING

try:
Expand Down Expand Up @@ -157,11 +157,7 @@
# determine how many documents to read in one go
nargs = len(arguments)
chunksize = nargs // nproc
if chunksize >= maxbatch:
# try to improve batch size vs. number of batches
chunksize = int(sqrt(nargs / nproc * maxbatch))
if chunksize == 0:
chunksize = 1
chunksize = max(min(chunksize, maxbatch), 1)
nchunks, rest = divmod(nargs, chunksize)
if rest:
nchunks += 1
Expand Down
Loading