Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions kolibri/core/content/test/test_import_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -1125,7 +1125,10 @@ def test_remote_import_no_space_after_first_download(
2201062 + 336974,
)
get_free_space_mock.side_effect = [100000000000, 0, 0, 0, 0, 0, 0]
with self.assertRaises(InsufficientStorageSpaceError):
# Ensure single threaded operation for deterministic testing
with patch(
"kolibri.core.tasks.utils.get_fd_limit", return_value=1
), self.assertRaises(InsufficientStorageSpaceError):
manager = RemoteChannelResourceImportManager(self.the_channel_id)
manager.run()
self.annotation_mock.set_content_visibility.assert_called_with(
Expand Down Expand Up @@ -1466,7 +1469,9 @@ def test_remote_import_full_import(
10,
)
manager = RemoteChannelResourceImportManager(self.the_channel_id)
manager.run()
# Ensure single threaded operation for deterministic testing
with patch("kolibri.core.tasks.utils.get_fd_limit", return_value=1):
manager.run()
self.annotation_mock.set_content_visibility.assert_called_with(
self.the_channel_id,
[
Expand Down
23 changes: 23 additions & 0 deletions kolibri/core/content/utils/resource_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,10 @@ def run_import(self):
# Allow for two open file descriptors per download:
# The temporary download file that the file is streamed to initially, and then
# the actual destination file that it is moved to.
# Note that with the possibility of a chunked file download,
# the true number of file descriptors used may be higher,
# but this is unlikely to be a problem in practice, and we build in extra tolerance
# in the fd_safe_executor max worker calculation.
with fd_safe_executor(fds_per_task=2) as executor:
self.executor = executor
batch_size = 100
Expand Down Expand Up @@ -392,8 +396,27 @@ def __init__(
)

self.session = requests.Session()
# Because we create the executor in the run method, we need to track
# we need to mount the adapter in the create_file_transfer method
# so that we can introspect the executor to configure the pool correctly.
self._adapter_mounted = False

def _mount_adapter(self):
if not self._adapter_mounted:
# If we are using a ThreadPoolExecutor, then we need to make sure
# that the requests session has enough connections to handle
# the number of threads.
max_workers = self.executor._max_workers
adapter = requests.adapters.HTTPAdapter(
pool_connections=max_workers,
pool_maxsize=max_workers,
)
self.session.mount("http://", adapter)
self.session.mount("https://", adapter)
self._adapter_mounted = True

def create_file_transfer(self, f, filename, dest):
self._mount_adapter()
url = paths.get_content_storage_remote_url(filename, baseurl=self.baseurl)
return transfer.FileDownload(
url,
Expand Down
29 changes: 16 additions & 13 deletions kolibri/core/tasks/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,12 +358,16 @@ def fd_safe_executor(fds_per_task=2):
else concurrent.futures.ThreadPoolExecutor
)

max_workers = 10
max_workers = 50

if not use_multiprocessing:
# If we're not using multiprocessing for workers, we may need
# to limit the number of workers depending on the number of allowed
# file descriptors.
# We may need to limit the number of workers depending
# on the number of allowed file descriptors.

if conf.OPTIONS["Tasks"]["USE_WORKER_MULTIPROCESSING"]:
# If we are using multiprocessing, then file descriptors are not shared.
# So we can use all the available file descriptors for this task.
max_descriptors_per_task = get_fd_limit()
else:
# This is a heuristic method, where we know there can be issues if
# the max number of file descriptors for a process is 256, and we use 10
# workers, with potentially 4 concurrent tasks downloading files.
Expand All @@ -376,12 +380,11 @@ def fd_safe_executor(fds_per_task=2):
max_descriptors_per_task = (
get_fd_limit() - server_reserved_fd_count
) / conf.OPTIONS["Tasks"]["REGULAR_PRIORITY_WORKERS"]
# Each task only needs to have a maximum of `fds_per_task` open file descriptors at once.
# To add tolerance, we divide the number of file descriptors that could be allocated to
# this task by double this number which should give us leeway in case of unforeseen
# descriptor use during the process.
max_workers = min(
max_workers, min(1, max_descriptors_per_task // (fds_per_task * 2))
)

# Each task only needs to have a maximum of `fds_per_task` open file descriptors at once.
# To add tolerance, we divide the number of file descriptors that could be allocated to
# this task by 1.5 times this number which should give us leeway in case of unforeseen
# descriptor use during the process.
max_workers = min(
max_workers, max(1, max_descriptors_per_task // (fds_per_task * 2))
)
return executor(max_workers=max_workers)
19 changes: 14 additions & 5 deletions kolibri/core/tasks/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,25 @@
from kolibri.core.tasks.storage import Storage
from kolibri.core.tasks.utils import db_connection
from kolibri.core.tasks.utils import InfiniteLoopThread
from kolibri.utils.logger import setup_worker_logging
from kolibri.utils.multiprocessing_compat import PoolExecutor

logger = logging.getLogger(__name__)


def _init_worker(log_queue):
"""
Initialize worker process, setting up logging to use the given log queue.
"""
setup_worker_logging(log_queue)


def execute_job(
job_id,
worker_host=None,
worker_process=None,
worker_thread=None,
worker_extra=None,
log_queue=None,
):
"""
Call the function stored in the job.func.
Expand All @@ -41,7 +48,7 @@ def execute_job(
django_connection.close()


def execute_job_with_python_worker(job_id, log_queue=None):
def execute_job_with_python_worker(job_id):
"""
Call execute_job but additionally with the current host, process and thread information taken
directly from python internals.
Expand All @@ -55,7 +62,6 @@ def execute_job_with_python_worker(job_id, log_queue=None):
worker_host=socket.gethostname(),
worker_process=str(os.getpid()),
worker_thread=str(threading.get_ident()),
log_queue=log_queue,
)


Expand Down Expand Up @@ -105,7 +111,11 @@ def shutdown_workers(self, wait=True):
self.workers.shutdown(wait=wait)

def start_workers(self):
pool = PoolExecutor(max_workers=self.max_workers)
pool = PoolExecutor(
max_workers=self.max_workers,
initializer=_init_worker,
initargs=(self.log_queue,),
)
return pool

def handle_finished_future(self, future):
Expand Down Expand Up @@ -199,7 +209,6 @@ def start_next_job(self, job):
future = self.workers.submit(
execute_job_with_python_worker,
job_id=job.job_id,
log_queue=self.log_queue,
)

# Check if the job ID already exists in the future_job_mapping dictionary
Expand Down
10 changes: 9 additions & 1 deletion kolibri/utils/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,15 @@ def prepare(self, record: logging.LogRecord) -> logging.LogRecord:
)
record.exc_info = None
if hasattr(record, "args"):
record.args = tuple(str(arg) for arg in record.args)
# Convert args to strings only if they aren't already pickle-safe
safe_args = []
for arg in record.args:
# Keep numeric types as-is for format compatibility
if isinstance(arg, (int, float, bool, type(None))):
safe_args.append(arg)
else:
safe_args.append(str(arg))
record.args = tuple(safe_args)

record = super().prepare(record)
record._logger_name = self.logger_name
Expand Down
2 changes: 1 addition & 1 deletion kolibri/utils/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -796,7 +796,7 @@ def csp_source_list(value):
"Tasks": {
"USE_WORKER_MULTIPROCESSING": {
"type": "multiprocess_bool",
"default": False,
"default": True,
"description": """
Whether to use Python multiprocessing for worker pools. If False, then it will use threading. This may be useful,
if running on a dedicated device with multiple cores, and a lot of asynchronous tasks get run.
Expand Down
Loading