Skip to content

Commit

Permalink
Fix an issue of releasing lock for rq export job when the worker subp…
Browse files Browse the repository at this point in the history
…rocess is killed (#8721)

The main problem fixed by this PR is as follows:
In the previous implementation, "long" locks were used when exporting a
resource or deleting an export cache.
If the export process was killed (e.g., by the OOM killer with 9
signal), the acquired lock was not released and remained active until
the auto-release timeout expired (e.g., 4 hours). A subsequent user
request to export a dataset could not acquire the lock, causing the job
to be scheduled for execution after 60 seconds (default value). When the
scheduled job ran again, it still could not acquire the lock, and the
entire process was repeated. Additionally, if a user initiated the
export process after the job was marked as scheduled, they were unable
to re-initiate the process and received an error because the RQ job
status was not set and handled correctly (it was remaining `STARTED`).

One more found and fixed problem is that 2 users that have rights to
export a resource could not make export in parallel (with the same
options like format, save_images) and one of them received a
`LockNotAvailableError` error.

---------

Co-authored-by: Maxim Zhiltsov <[email protected]>
  • Loading branch information
Marishka17 and zhiltsov-max authored Dec 19, 2024
1 parent c4504dc commit a9ea512
Show file tree
Hide file tree
Showing 11 changed files with 609 additions and 340 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
### Fixed

- Exporting datasets could start significantly later than expected, both for 1
and several users in the same project/task/job (<https://github.com/cvat-ai/cvat/pull/8721>)
- Scheduled RQ jobs could not be restarted due to incorrect RQ job status
updating and handling (<https://github.com/cvat-ai/cvat/pull/8721>)
9 changes: 0 additions & 9 deletions cvat/apps/dataset_manager/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,3 @@

class DatasetManagerConfig(AppConfig):
name = "cvat.apps.dataset_manager"

def ready(self) -> None:
from django.conf import settings

from . import default_settings

for key in dir(default_settings):
if key.isupper() and not hasattr(settings, key):
setattr(settings, key, getattr(default_settings, key))
14 changes: 0 additions & 14 deletions cvat/apps/dataset_manager/default_settings.py

This file was deleted.

635 changes: 400 additions & 235 deletions cvat/apps/dataset_manager/tests/test_rest_api_formats.py

Large diffs are not rendered by default.

16 changes: 12 additions & 4 deletions cvat/apps/dataset_manager/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,16 @@ def get_export_cache_lock(
*,
ttl: int | timedelta,
block: bool = True,
acquire_timeout: Optional[int | timedelta] = None,
acquire_timeout: int | timedelta,
) -> Generator[Lock, Any, Any]:
assert acquire_timeout is not None, "Endless waiting for the lock should be avoided"

if isinstance(acquire_timeout, timedelta):
acquire_timeout = acquire_timeout.total_seconds()
if acquire_timeout is not None and acquire_timeout < 0:

if acquire_timeout < 0:
raise ValueError("acquire_timeout must be a non-negative number")
elif acquire_timeout is None:
acquire_timeout = -1


if isinstance(ttl, timedelta):
ttl = ttl.total_seconds()
Expand Down Expand Up @@ -233,3 +235,9 @@ def parse_export_file_path(file_path: os.PathLike[str]) -> ParsedExportFilename:
format_repr=basename_match.group('format_tag'),
file_ext=basename_match.group('file_ext'),
)

def extend_export_file_lifetime(file_path: str):
# Update the last modification time to extend the export's lifetime,
# as the last access time is not available on every filesystem.
# As a result, file deletion by the cleaning job will be postponed.
os.utime(file_path, None)
138 changes: 84 additions & 54 deletions cvat/apps/dataset_manager/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import django_rq
import rq
from os.path import exists as osp_exists
from django.conf import settings
from django.utils import timezone
from rq_scheduler import Scheduler
Expand All @@ -27,21 +28,23 @@
LockNotAvailableError,
current_function_name, get_export_cache_lock,
get_export_cache_dir, make_export_filename,
parse_export_file_path
parse_export_file_path, extend_export_file_lifetime
)
from .util import EXPORT_CACHE_DIR_NAME # pylint: disable=unused-import


slogger = ServerLogManager(__name__)

_MODULE_NAME = __package__ + '.' + osp.splitext(osp.basename(__file__))[0]
def log_exception(logger=None, exc_info=True):

def log_exception(logger: logging.Logger | None = None, exc_info: bool = True):
if logger is None:
logger = slogger
logger = slogger.glob
logger.exception("[%s @ %s]: exception occurred" % \
(_MODULE_NAME, current_function_name(2)),
exc_info=exc_info)

DEFAULT_CACHE_TTL = timedelta(seconds=settings.DATASET_CACHE_TTL)
DEFAULT_CACHE_TTL = timedelta(seconds=settings.EXPORT_CACHE_TTL)
PROJECT_CACHE_TTL = DEFAULT_CACHE_TTL
TASK_CACHE_TTL = DEFAULT_CACHE_TTL
JOB_CACHE_TTL = DEFAULT_CACHE_TTL
Expand All @@ -51,8 +54,9 @@ def log_exception(logger=None, exc_info=True):
'job': JOB_CACHE_TTL,
}

EXPORT_CACHE_LOCK_TIMEOUT = timedelta(seconds=settings.DATASET_CACHE_LOCK_TIMEOUT)
EXPORT_LOCKED_RETRY_INTERVAL = timedelta(seconds=settings.DATASET_EXPORT_LOCKED_RETRY_INTERVAL)
EXPORT_CACHE_LOCK_ACQUISITION_TIMEOUT = timedelta(seconds=settings.EXPORT_CACHE_LOCK_ACQUISITION_TIMEOUT)
EXPORT_CACHE_LOCK_TTL = timedelta(seconds=settings.EXPORT_CACHE_LOCK_TTL)
EXPORT_LOCKED_RETRY_INTERVAL = timedelta(seconds=settings.EXPORT_LOCKED_RETRY_INTERVAL)


def get_export_cache_ttl(db_instance: str | Project | Task | Job) -> timedelta:
Expand All @@ -61,6 +65,14 @@ def get_export_cache_ttl(db_instance: str | Project | Task | Job) -> timedelta:

return TTL_CONSTS[db_instance.lower()]

def _patch_scheduled_job_status(job: rq.job.Job):
# NOTE: rq scheduler < 0.14 does not set the appropriate
# job status (SCHEDULED). This has been fixed in the 0.14 version.
# https://github.com/rq/rq-scheduler/blob/f7d5787c5f94b5517e209c612ef648f4bfc44f9e/rq_scheduler/scheduler.py#L148
# FUTURE-TODO: delete manual status setting after upgrading to 0.14
if job.get_status(refresh=False) != rq.job.JobStatus.SCHEDULED:
job.set_status(rq.job.JobStatus.SCHEDULED)

def _retry_current_rq_job(time_delta: timedelta) -> rq.job.Job:
# TODO: implement using retries once we move from rq_scheduler to builtin RQ scheduler
# for better reliability and error reporting
Expand All @@ -79,7 +91,7 @@ def _patched_retry(*_1, **_2):
user_id = current_rq_job.meta.get('user', {}).get('id') or -1

with get_rq_lock_by_user(settings.CVAT_QUEUES.EXPORT_DATA.value, user_id):
scheduler.enqueue_in(
scheduled_rq_job: rq.job.Job = scheduler.enqueue_in(
time_delta,
current_rq_job.func,
*current_rq_job.args,
Expand All @@ -92,12 +104,21 @@ def _patched_retry(*_1, **_2):
on_success=current_rq_job.success_callback,
on_failure=current_rq_job.failure_callback,
)
_patch_scheduled_job_status(scheduled_rq_job)

current_rq_job.retries_left = 1
setattr(current_rq_job, 'retry', _patched_retry)
return current_rq_job

def export(dst_format, project_id=None, task_id=None, job_id=None, server_url=None, save_images=False):
def export(
*,
dst_format: str,
project_id: int | None = None,
task_id: int | None = None,
job_id: int | None = None,
server_url: str | None = None,
save_images: bool = False,
):
try:
if task_id is not None:
logger = slogger.task[task_id]
Expand Down Expand Up @@ -134,41 +155,50 @@ def export(dst_format, project_id=None, task_id=None, job_id=None, server_url=No

os.makedirs(cache_dir, exist_ok=True)

# acquire a lock 2 times instead of using one long lock:
# 1. to check whether the file exists or not
# 2. to create a file when it doesn't exist
with get_export_cache_lock(
output_path,
block=True,
acquire_timeout=EXPORT_CACHE_LOCK_TIMEOUT,
ttl=rq.get_current_job().timeout,
ttl=EXPORT_CACHE_LOCK_TTL,
acquire_timeout=EXPORT_CACHE_LOCK_ACQUISITION_TIMEOUT,
):
if not osp.exists(output_path):
with tempfile.TemporaryDirectory(dir=cache_dir) as temp_dir:
temp_file = osp.join(temp_dir, 'result')
export_fn(db_instance.id, temp_file, dst_format,
server_url=server_url, save_images=save_images)
os.replace(temp_file, output_path)

scheduler: Scheduler = django_rq.get_scheduler(
settings.CVAT_QUEUES.EXPORT_DATA.value
)
cleaning_job = scheduler.enqueue_in(
time_delta=cache_ttl,
func=clear_export_cache,
file_path=output_path,
file_ctime=instance_update_time.timestamp(),
logger=logger
)
logger.info(
"The {} '{}' is exported as '{}' at '{}' "
"and available for downloading for the next {}. "
"Export cache cleaning job is enqueued, id '{}'".format(
db_instance.__class__.__name__.lower(),
db_instance.name if isinstance(
db_instance, (Project, Task)
) else db_instance.id,
dst_format, output_path, cache_ttl,
cleaning_job.id
)
)
if osp_exists(output_path):
extend_export_file_lifetime(output_path)
return output_path

with tempfile.TemporaryDirectory(dir=cache_dir) as temp_dir:
temp_file = osp.join(temp_dir, 'result')
export_fn(db_instance.id, temp_file, dst_format,
server_url=server_url, save_images=save_images)
with get_export_cache_lock(
output_path,
ttl=EXPORT_CACHE_LOCK_TTL,
acquire_timeout=EXPORT_CACHE_LOCK_ACQUISITION_TIMEOUT,
):
os.replace(temp_file, output_path)

scheduler: Scheduler = django_rq.get_scheduler(settings.CVAT_QUEUES.EXPORT_DATA.value)
cleaning_job = scheduler.enqueue_in(
time_delta=cache_ttl,
func=clear_export_cache,
file_path=output_path,
file_ctime=instance_update_time.timestamp(),
logger=logger,
)
_patch_scheduled_job_status(cleaning_job)
logger.info(
"The {} '{}' is exported as '{}' at '{}' "
"and available for downloading for the next {}. "
"Export cache cleaning job is enqueued, id '{}'".format(
db_instance.__class__.__name__.lower(),
db_instance.id,
dst_format,
output_path,
cache_ttl,
cleaning_job.id,
)
)

return output_path
except LockNotAvailableError:
Expand All @@ -184,23 +214,23 @@ def export(dst_format, project_id=None, task_id=None, job_id=None, server_url=No
log_exception(logger)
raise

def export_job_annotations(job_id, dst_format=None, server_url=None):
return export(dst_format,job_id=job_id, server_url=server_url, save_images=False)
def export_job_annotations(job_id: int, dst_format: str, *, server_url: str | None = None):
return export(dst_format=dst_format, job_id=job_id, server_url=server_url, save_images=False)

def export_job_as_dataset(job_id, dst_format=None, server_url=None):
return export(dst_format, job_id=job_id, server_url=server_url, save_images=True)
def export_job_as_dataset(job_id: int, dst_format: str, *, server_url: str | None = None):
return export(dst_format=dst_format, job_id=job_id, server_url=server_url, save_images=True)

def export_task_as_dataset(task_id, dst_format=None, server_url=None):
return export(dst_format, task_id=task_id, server_url=server_url, save_images=True)
def export_task_as_dataset(task_id: int, dst_format: str, *, server_url: str | None = None):
return export(dst_format=dst_format, task_id=task_id, server_url=server_url, save_images=True)

def export_task_annotations(task_id, dst_format=None, server_url=None):
return export(dst_format,task_id=task_id, server_url=server_url, save_images=False)
def export_task_annotations(task_id: int, dst_format: str, *, server_url: str | None = None):
return export(dst_format=dst_format, task_id=task_id, server_url=server_url, save_images=False)

def export_project_as_dataset(project_id, dst_format=None, server_url=None):
return export(dst_format, project_id=project_id, server_url=server_url, save_images=True)
def export_project_as_dataset(project_id: int, dst_format: str, *, server_url: str | None = None):
return export(dst_format=dst_format, project_id=project_id, server_url=server_url, save_images=True)

def export_project_annotations(project_id, dst_format=None, server_url=None):
return export(dst_format, project_id=project_id, server_url=server_url, save_images=False)
def export_project_annotations(project_id: int, dst_format: str, *, server_url: str | None = None):
return export(dst_format=dst_format, project_id=project_id, server_url=server_url, save_images=False)


class FileIsBeingUsedError(Exception):
Expand All @@ -213,8 +243,8 @@ def clear_export_cache(file_path: str, file_ctime: float, logger: logging.Logger
with get_export_cache_lock(
file_path,
block=True,
acquire_timeout=EXPORT_CACHE_LOCK_TIMEOUT,
ttl=rq.get_current_job().timeout,
acquire_timeout=EXPORT_CACHE_LOCK_ACQUISITION_TIMEOUT,
ttl=EXPORT_CACHE_LOCK_TTL,
):
if not osp.exists(file_path):
raise FileNotFoundError("Export cache file '{}' doesn't exist".format(file_path))
Expand Down
Loading

0 comments on commit a9ea512

Please sign in to comment.