Skip to content

Commit 89e4ba2

Browse files
authored
Merge pull request #284 from buildingSMART/IVS-720_Fix_Platform_Fairness_via_Locks
2 parents 6a69f67 + ad8d6d0 commit 89e4ba2

File tree

4 files changed

+25
-10
lines changed

4 files changed

+25
-10
lines changed

backend/apps/ifc_validation/tasks/task_runner.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
import functools
22
import psutil
3+
import random
34

45
from celery import shared_task, chain, chord, group
56
from celery.exceptions import SoftTimeLimitExceeded
67

7-
from core.redis_lock import acquire_user_lock
8+
from core.redis_lock import acquire_user_lock, LockError
89
from core.utils import log_execution
910

1011
from apps.ifc_validation_models.decorators import requires_django_user_context
@@ -66,10 +67,14 @@ def wrapper(self, *args, **kwargs):
6667
with acquire_user_lock(user_id, task_name):
6768
return task_func(self, *args, **kwargs)
6869

69-
except RuntimeError as exc:
70-
# countdown: exponential backoff - prevents thundering herd
71-
# max_retries=0: infinite retries - only for locking purposes
72-
raise self.retry(exc=exc, countdown=15 + self.request.retries * 7, max_retries=0)
70+
except LockError as exc:
71+
base_backoff = 10 + self.request.retries * 7
72+
jitter_backoff = random.randint(0,10)
73+
raise self.retry(
74+
exc=exc,
75+
countdown=base_backoff+jitter_backoff,
76+
max_retries=None # infinite
77+
)
7378

7479
except Exception as exc:
7580
# other errors — fail permanently
@@ -163,11 +168,11 @@ def on_workflow_failed(self, *args, **kwargs):
163168
def task_factory(task_type):
164169
config = task_registry[task_type]
165170

166-
@shared_task(bind=True, name=config.celery_task_name)
171+
@shared_task(bind=True, name=config.celery_task_name, max_retries=None)
172+
@with_user_task_lock(task_name=config.celery_task_name)
167173
@log_execution
168174
@requires_django_user_context
169175
@kill_subprocesses_on_timeout
170-
@with_user_task_lock(task_name=config.celery_task_name)
171176
def validation_subtask_runner(self, *args, **kwargs):
172177

173178
id = kwargs.get('id')

backend/core/celery.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,12 @@ def on_failure(self, exc_info, send_failed_event=True, return_ok=False):
2828
send_failed_event=send_failed_event,
2929
return_ok=return_ok
3030
)
31-
logger.error(f'Failure detected for task {self.task.name} - {exc_info=}')
31+
logger.error(f'Failure detected for task {self.task.name} - {exc_info}')
32+
33+
def on_retry(self, exc_info):
34+
35+
super().on_retry(exc_info)
36+
logger.warning(f"Retry detected for task {self.task.name} - {exc_info}")
3237

3338

3439
class BaseTask(Task):
@@ -49,7 +54,7 @@ def on_success(self, retval, task_id, args, kwargs):
4954
logger.debug("*** SUCCESS ***")
5055

5156
def on_retry(self, exc, task_id, args, kwargs, einfo):
52-
logger.warn("*** RETRY ***")
57+
logger.warning("*** RETRY ***")
5358

5459
def on_failure(self, exc, task_id, args, kwargs, einfo):
5560
logger.error("*** FAILURE ***")

backend/core/redis_lock.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,10 @@
99
redis_client: Redis = Redis.from_url(CELERY_BROKER_URL, decode_responses=True)
1010

1111

12+
class LockError(Exception):
13+
pass
14+
15+
1216
@contextmanager
1317
def acquire_user_lock(
1418
user_id: int | str,
@@ -32,7 +36,7 @@ def acquire_user_lock(
3236

3337
acquired = lock.acquire(blocking=True)
3438
if not acquired:
35-
raise RuntimeError(f"User {user_id} already has an active task_name {task_name} (lock held).")
39+
raise LockError(f"User {user_id} already has an active task_name {task_name} (lock held).")
3640

3741
try:
3842
logger.info(f"Lock acquired for user {user_id} and task {task_name} (key={lock_key})")

backend/core/settings.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,7 @@
338338
CELERY_TASK_ALLOW_ERROR_CB_ON_CHORD_HEADER = True
339339

340340
# reliability settings - see https://www.francoisvoron.com/blog/configure-celery-for-reliable-delivery
341+
CELERY_WORKER_PREFETCH_MULTIPLIER = 1
341342
CELERY_TASK_REJECT_ON_WORKER_LOST = True
342343
CELERY_TASK_ACKS_LATE = True
343344
CELERY_TASK_STORE_ERRORS_EVEN_IF_IGNORED = True

0 commit comments

Comments
 (0)