Skip to content

Commit a16095e

Browse files
committed
Increase potential max workers and allow connection pool to increase to match.
1 parent 80fb9ef commit a16095e

File tree

2 files changed

+39
-13
lines changed

2 files changed

+39
-13
lines changed

kolibri/core/content/utils/resource_import.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,10 @@ def run_import(self):
282282
# Allow for two open file descriptors per download:
283283
# The temporary download file that the file is streamed to initially, and then
284284
# the actual destination file that it is moved to.
285+
# Note that with the possibility of a chunked file download,
286+
# the true number of file descriptors used may be higher,
287+
# but this is unlikely to be a problem in practice, and we build in extra tolerance
288+
# in the fd_safe_executor max worker calculation.
285289
with fd_safe_executor(fds_per_task=2) as executor:
286290
self.executor = executor
287291
batch_size = 100
@@ -392,8 +396,27 @@ def __init__(
392396
)
393397

394398
self.session = requests.Session()
399+
# Because we create the executor in the run method, we need to track
400+
# we need to mount the adapter in the create_file_transfer method
401+
# so that we can introspect the executor to configure the pool correctly.
402+
self._adapter_mounted = False
403+
404+
def _mount_adapter(self):
405+
if not self._adapter_mounted:
406+
# If we are using a ThreadPoolExecutor, then we need to make sure
407+
# that the requests session has enough connections to handle
408+
# the number of threads.
409+
max_workers = self.executor._max_workers
410+
adapter = requests.adapters.HTTPAdapter(
411+
pool_connections=max_workers,
412+
pool_maxsize=max_workers,
413+
)
414+
self.session.mount("http://", adapter)
415+
self.session.mount("https://", adapter)
416+
self._adapter_mounted = True
395417

396418
def create_file_transfer(self, f, filename, dest):
419+
self._mount_adapter()
397420
url = paths.get_content_storage_remote_url(filename, baseurl=self.baseurl)
398421
return transfer.FileDownload(
399422
url,

kolibri/core/tasks/utils.py

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -358,12 +358,16 @@ def fd_safe_executor(fds_per_task=2):
358358
else concurrent.futures.ThreadPoolExecutor
359359
)
360360

361-
max_workers = 10
361+
max_workers = 50
362362

363-
if not use_multiprocessing:
364-
# If we're not using multiprocessing for workers, we may need
365-
# to limit the number of workers depending on the number of allowed
366-
# file descriptors.
363+
# We may need to limit the number of workers depending
364+
# on the number of allowed file descriptors.
365+
366+
if conf.OPTIONS["Tasks"]["USE_WORKER_MULTIPROCESSING"]:
367+
# If we are using multiprocessing, then file descriptors are not shared.
368+
# So we can use all the available file descriptors for this task.
369+
max_descriptors_per_task = get_fd_limit()
370+
else:
367371
# This is a heuristic method, where we know there can be issues if
368372
# the max number of file descriptors for a process is 256, and we use 10
369373
# workers, with potentially 4 concurrent tasks downloading files.
@@ -376,12 +380,11 @@ def fd_safe_executor(fds_per_task=2):
376380
max_descriptors_per_task = (
377381
get_fd_limit() - server_reserved_fd_count
378382
) / conf.OPTIONS["Tasks"]["REGULAR_PRIORITY_WORKERS"]
379-
# Each task only needs to have a maximum of `fds_per_task` open file descriptors at once.
380-
# To add tolerance, we divide the number of file descriptors that could be allocated to
381-
# this task by double this number which should give us leeway in case of unforeseen
382-
# descriptor use during the process.
383-
max_workers = min(
384-
max_workers, max(1, max_descriptors_per_task // (fds_per_task * 2))
385-
)
386-
383+
# Each task only needs to have a maximum of `fds_per_task` open file descriptors at once.
384+
# To add tolerance, we divide the number of file descriptors that could be allocated to
385+
# this task by 1.5 times this number which should give us leeway in case of unforeseen
386+
# descriptor use during the process.
387+
max_workers = min(
388+
max_workers, max(1, max_descriptors_per_task // (fds_per_task * 2))
389+
)
387390
return executor(max_workers=max_workers)

0 commit comments

Comments
 (0)