Skip to content

Commit

Permalink
Support redis migrations (#8898)
Browse files Browse the repository at this point in the history
Sometimes we need to modify the data stored in Redis (for instance, in
the near future we will change the identifiers of RQ jobs). This PR
introduces a common mechanism for handling Redis migrations.
  • Loading branch information
Marishka17 authored Feb 4, 2025
1 parent 74b14c5 commit a1dc585
Show file tree
Hide file tree
Showing 16 changed files with 292 additions and 12 deletions.
13 changes: 12 additions & 1 deletion backend_entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ wait_for_db() {
wait-for-it "${CVAT_POSTGRES_HOST}:${CVAT_POSTGRES_PORT:-5432}" -t 0
}

wait_for_redis_inmem() {
wait-for-it "${CVAT_REDIS_INMEM_HOST}:${CVAT_REDIS_INMEM_PORT:-6379}" -t 0
}

cmd_bash() {
exec bash "$@"
}
Expand All @@ -19,7 +23,8 @@ cmd_init() {
wait_for_db
~/manage.py migrate

wait-for-it "${CVAT_REDIS_INMEM_HOST}:${CVAT_REDIS_INMEM_PORT:-6379}" -t 0
wait_for_redis_inmem
~/manage.py migrateredis
~/manage.py syncperiodicjobs
}

Expand All @@ -39,6 +44,12 @@ cmd_run() {
sleep 10
done

wait_for_redis_inmem
echo "waiting for Redis migrations to complete..."
while ! ~/manage.py migrateredis --check; do
sleep 10
done

exec supervisord -c "supervisord/$1.conf"
}

Expand Down
4 changes: 4 additions & 0 deletions changelog.d/20250117_174701_maria_redis_migrations.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
### Added

- Support for managing Redis migrations
(<https://github.com/cvat-ai/cvat/pull/8898>)
20 changes: 20 additions & 0 deletions cvat/apps/engine/redis_migrations/001_cleanup_scheduled_jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Copyright (C) CVAT.ai Corporation
#
# SPDX-License-Identifier: MIT

import django_rq
from django.conf import settings
from rq_scheduler import Scheduler

from cvat.apps.redis_handler.redis_migrations import BaseMigration


class Migration(BaseMigration):
@classmethod
def run(cls):
scheduler: Scheduler = django_rq.get_scheduler(settings.CVAT_QUEUES.EXPORT_DATA.value)

for job in scheduler.get_jobs():
if job.func_name == "cvat.apps.dataset_manager.views.clear_export_cache":
scheduler.cancel(job)
job.delete()
3 changes: 3 additions & 0 deletions cvat/apps/engine/redis_migrations/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Copyright (C) CVAT.ai Corporation
#
# SPDX-License-Identifier: MIT
3 changes: 3 additions & 0 deletions cvat/apps/redis_handler/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Copyright (C) CVAT.ai Corporation
#
# SPDX-License-Identifier: MIT
10 changes: 10 additions & 0 deletions cvat/apps/redis_handler/apps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Copyright (C) CVAT.ai Corporation
#
# SPDX-License-Identifier: MIT


from django.apps import AppConfig


class RedisHandlerConfig(AppConfig):
name = "cvat.apps.redis_handler"
3 changes: 3 additions & 0 deletions cvat/apps/redis_handler/management/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Copyright (C) CVAT.ai Corporation
#
# SPDX-License-Identifier: MIT
3 changes: 3 additions & 0 deletions cvat/apps/redis_handler/management/commands/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Copyright (C) CVAT.ai Corporation
#
# SPDX-License-Identifier: MIT
69 changes: 69 additions & 0 deletions cvat/apps/redis_handler/management/commands/migrateredis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# Copyright (C) CVAT.ai Corporation
#
# SPDX-License-Identifier: MIT

import sys
import traceback
from argparse import ArgumentParser

from django.conf import settings
from django.core.management.base import BaseCommand, CommandError
from redis import Redis

from cvat.apps.redis_handler.migration_loader import AppliedMigration, MigrationLoader


class Command(BaseCommand):
help = "Applies Redis migrations and records them in the database"

def add_arguments(self, parser: ArgumentParser) -> None:
parser.add_argument(
"--check",
action="store_true",
help="Checks whether Redis migrations have been applied; exits with non-zero status if not",
)

def handle(self, *args, **options) -> None:
conn = Redis(
host=settings.REDIS_INMEM_SETTINGS["HOST"],
port=settings.REDIS_INMEM_SETTINGS["PORT"],
db=settings.REDIS_INMEM_SETTINGS["DB"],
password=settings.REDIS_INMEM_SETTINGS["PASSWORD"],
)
loader = MigrationLoader(connection=conn)

if options["check"]:
if not loader:
return

sys.exit(1)

if not loader:
self.stdout.write("No migrations to apply")
return

for migration in loader:
try:
migration.run()

# add migration to applied ones
applied_migration = AppliedMigration(
name=migration.name,
app_label=migration.app_label,
)
applied_migration.save(connection=conn)

except Exception as ex:
self.stderr.write(
self.style.ERROR(
f"[{migration.app_label}] Failed to apply migration: {migration.name}"
)
)
self.stderr.write(self.style.ERROR(f"\n{traceback.format_exc()}"))
raise CommandError(str(ex))

self.stdout.write(
self.style.SUCCESS(
f"[{migration.app_label}] Successfully applied migration: {migration.name}"
)
)
126 changes: 126 additions & 0 deletions cvat/apps/redis_handler/migration_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
# Copyright (C) CVAT.ai Corporation
#
# SPDX-License-Identifier: MIT

import importlib
from datetime import datetime
from pathlib import Path
from typing import Any, ClassVar

from attrs import field, frozen, validators
from django.apps import AppConfig, apps
from django.utils import timezone
from redis import Redis

from cvat.apps.redis_handler.redis_migrations import BaseMigration


def to_datetime(value: float | str | datetime) -> datetime:
if isinstance(value, datetime):
return value
elif isinstance(value, str):
value = float(value)

return datetime.fromtimestamp(value)


@frozen
class AppliedMigration:
SET_KEY: ClassVar[str] = "cvat:applied_migrations"
KEY_PREFIX: ClassVar[str] = "cvat:applied_migration:"

name: str = field(validator=[validators.instance_of(str), validators.max_len(128)])
app_label: str = field(validator=[validators.instance_of(str), validators.max_len(128)])
applied_date: datetime = field(
validator=[validators.instance_of(datetime)], converter=to_datetime, factory=timezone.now
)

def get_key(self) -> str:
return f"{self.app_label}.{self.name}"

def get_key_with_prefix(self) -> str:
return self.KEY_PREFIX + self.get_key()

def to_dict(self) -> dict[str, Any]:
return {
"applied_date": self.applied_date.timestamp(),
}

def save(self, *, connection: Redis) -> None:
with connection.pipeline() as pipe:
pipe.hset(self.get_key_with_prefix(), mapping=self.to_dict())
pipe.sadd(self.SET_KEY, self.get_key())
pipe.execute()


class LoaderError(Exception):
pass


class MigrationLoader:
REDIS_MIGRATIONS_DIR_NAME = "redis_migrations"
REDIS_MIGRATION_CLASS_NAME = "Migration"

def __init__(self, *, connection: Redis) -> None:
self._connection = connection
self._app_config_mapping = {
app_config.label: app_config for app_config in self._find_app_configs()
}
self._disk_migrations_per_app: dict[str, list[str]] = {}
self._applied_migrations: dict[str, set[str]] = {}
self._unapplied_migrations: list[BaseMigration] = []

self._load_from_disk()
self._init_applied_migrations()
self._init_unapplied_migrations()

def _find_app_configs(self) -> list[AppConfig]:
return [
app_config
for app_config in apps.get_app_configs()
if app_config.name.startswith("cvat")
and (Path(app_config.path) / self.REDIS_MIGRATIONS_DIR_NAME).exists()
]

def _load_from_disk(self):
for app_label, app_config in self._app_config_mapping.items():
migrations_dir = Path(app_config.path) / self.REDIS_MIGRATIONS_DIR_NAME
for migration_file in sorted(migrations_dir.glob("[0-9]*.py")):
migration_name = migration_file.stem
(self._disk_migrations_per_app.setdefault(app_label, [])).append(migration_name)

def _init_applied_migrations(self):
applied_migration_keys: list[str] = [
i.decode("utf-8") for i in self._connection.smembers(AppliedMigration.SET_KEY)
]
for key in applied_migration_keys:
app_label, migration_name = key.split(".")
self._applied_migrations.setdefault(app_label, set()).add(migration_name)

def _init_unapplied_migrations(self):
for app_label, migration_names in self._disk_migrations_per_app.items():
app_config = self._app_config_mapping[app_label]
app_unapplied_migrations = sorted(
set(migration_names) - self._applied_migrations.get(app_label, set())
)
for migration_name in app_unapplied_migrations:
MigrationClass = self.get_migration_class(app_config.name, migration_name)
self._unapplied_migrations.append(
MigrationClass(migration_name, app_config.label, connection=self._connection)
)

def get_migration_class(self, app_name: str, migration_name: str) -> BaseMigration:
migration_module_path = ".".join([app_name, self.REDIS_MIGRATIONS_DIR_NAME, migration_name])
module = importlib.import_module(migration_module_path)
MigrationClass = getattr(module, self.REDIS_MIGRATION_CLASS_NAME, None)

if not MigrationClass or not issubclass(MigrationClass, BaseMigration):
raise LoaderError(f"Invalid migration: {migration_module_path}")

return MigrationClass

def __iter__(self):
yield from self._unapplied_migrations

def __len__(self):
return len(self._unapplied_migrations)
19 changes: 19 additions & 0 deletions cvat/apps/redis_handler/redis_migrations/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Copyright (C) CVAT.ai Corporation
#
# SPDX-License-Identifier: MIT

from abc import ABCMeta, abstractmethod

from attrs import define, field, validators
from redis import Redis


@define
class BaseMigration(metaclass=ABCMeta):
name: str = field(validator=[validators.instance_of(str)])
app_label: str = field(validator=[validators.instance_of(str)])
connection: Redis = field(validator=[validators.instance_of(Redis)], kw_only=True)

@classmethod
@abstractmethod
def run(cls) -> None: ...
21 changes: 11 additions & 10 deletions cvat/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ def generate_secret_key():
'cvat.apps.events',
'cvat.apps.quality_control',
'cvat.apps.analytics_report',
'cvat.apps.redis_handler',
]

SITE_ID = 1
Expand Down Expand Up @@ -284,7 +285,7 @@ class CVAT_QUEUES(Enum):
redis_inmem_port = os.getenv('CVAT_REDIS_INMEM_PORT', 6379)
redis_inmem_password = os.getenv('CVAT_REDIS_INMEM_PASSWORD', '')

shared_queue_settings = {
REDIS_INMEM_SETTINGS = {
'HOST': redis_inmem_host,
'PORT': redis_inmem_port,
'DB': 0,
Expand All @@ -293,39 +294,39 @@ class CVAT_QUEUES(Enum):

RQ_QUEUES = {
CVAT_QUEUES.IMPORT_DATA.value: {
**shared_queue_settings,
**REDIS_INMEM_SETTINGS,
'DEFAULT_TIMEOUT': '4h',
},
CVAT_QUEUES.EXPORT_DATA.value: {
**shared_queue_settings,
**REDIS_INMEM_SETTINGS,
'DEFAULT_TIMEOUT': '4h',
},
CVAT_QUEUES.AUTO_ANNOTATION.value: {
**shared_queue_settings,
**REDIS_INMEM_SETTINGS,
'DEFAULT_TIMEOUT': '24h',
},
CVAT_QUEUES.WEBHOOKS.value: {
**shared_queue_settings,
**REDIS_INMEM_SETTINGS,
'DEFAULT_TIMEOUT': '1h',
},
CVAT_QUEUES.NOTIFICATIONS.value: {
**shared_queue_settings,
**REDIS_INMEM_SETTINGS,
'DEFAULT_TIMEOUT': '1h',
},
CVAT_QUEUES.QUALITY_REPORTS.value: {
**shared_queue_settings,
**REDIS_INMEM_SETTINGS,
'DEFAULT_TIMEOUT': '1h',
},
CVAT_QUEUES.ANALYTICS_REPORTS.value: {
**shared_queue_settings,
**REDIS_INMEM_SETTINGS,
'DEFAULT_TIMEOUT': '1h',
},
CVAT_QUEUES.CLEANING.value: {
**shared_queue_settings,
**REDIS_INMEM_SETTINGS,
'DEFAULT_TIMEOUT': '2h',
},
CVAT_QUEUES.CHUNKS.value: {
**shared_queue_settings,
**REDIS_INMEM_SETTINGS,
'DEFAULT_TIMEOUT': '5m',
},
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ description: 'Installing a development environment for different operating syste

```bash
python manage.py migrate
python manage.py migrateredis
python manage.py collectstatic
python manage.py syncperiodicjobs
python manage.py createsuperuser
Expand Down
9 changes: 8 additions & 1 deletion tests/python/shared/fixtures/init.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,14 @@ def kube_restore_clickhouse_db():


def _get_redis_inmem_keys_to_keep():
return ("rq:worker:", "rq:workers", "rq:scheduler_instance:", "rq:queues:")
return (
"rq:worker:",
"rq:workers",
"rq:scheduler_instance:",
"rq:queues:",
"cvat:applied_migrations",
"cvat:applied_migration:",
)


def docker_restore_redis_inmem():
Expand Down

0 comments on commit a1dc585

Please sign in to comment.