From ea0f451bf8dd9299d4a3dc340058257ec6213b7a Mon Sep 17 00:00:00 2001 From: Jake Howard Date: Sun, 16 Feb 2025 18:20:08 +0000 Subject: [PATCH 1/2] Add timeout to task definition --- django_tasks/backends/database/backend.py | 1 + .../migrations/0015_dbtaskresult_timeout.py | 17 +++++++++++++++++ django_tasks/backends/database/models.py | 3 +++ django_tasks/task.py | 12 +++++++++++- 4 files changed, 32 insertions(+), 1 deletion(-) create mode 100644 django_tasks/backends/database/migrations/0015_dbtaskresult_timeout.py diff --git a/django_tasks/backends/database/backend.py b/django_tasks/backends/database/backend.py index 89a5e53..d5e4e4a 100644 --- a/django_tasks/backends/database/backend.py +++ b/django_tasks/backends/database/backend.py @@ -46,6 +46,7 @@ def _task_to_db_task( queue_name=task.queue_name, run_after=task.run_after, backend_name=self.alias, + timeout=task.timeout, ) def enqueue( diff --git a/django_tasks/backends/database/migrations/0015_dbtaskresult_timeout.py b/django_tasks/backends/database/migrations/0015_dbtaskresult_timeout.py new file mode 100644 index 0000000..e61f48d --- /dev/null +++ b/django_tasks/backends/database/migrations/0015_dbtaskresult_timeout.py @@ -0,0 +1,17 @@ +# Generated by Django 5.1.6 on 2025-02-16 16:53 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("django_tasks_database", "0014_remove_dbtaskresult_exception_data"), + ] + + operations = [ + migrations.AddField( + model_name="dbtaskresult", + name="timeout", + field=models.PositiveBigIntegerField(default=600, verbose_name="timeout"), + ), + ] diff --git a/django_tasks/backends/database/models.py b/django_tasks/backends/database/models.py index 720157c..48786a6 100644 --- a/django_tasks/backends/database/models.py +++ b/django_tasks/backends/database/models.py @@ -15,6 +15,7 @@ from django_tasks.task import ( DEFAULT_PRIORITY, DEFAULT_QUEUE_NAME, + DEFAULT_TIMEOUT, MAX_PRIORITY, MIN_PRIORITY, ResultStatus, @@ -99,6 +100,7 @@ class DBTaskResult(GenericBase[P, T], models.Model): backend_name = models.TextField(_("backend name")) run_after = models.DateTimeField(_("run after"), null=True) + timeout = models.PositiveBigIntegerField(_("timeout"), default=DEFAULT_TIMEOUT) return_value = models.JSONField(_("return value"), default=None, null=True) @@ -141,6 +143,7 @@ def task(self) -> Task[P, T]: queue_name=self.queue_name, run_after=self.run_after, backend=self.backend_name, + timeout=self.timeout, ) @property diff --git a/django_tasks/task.py b/django_tasks/task.py index f24cda8..045c79a 100644 --- a/django_tasks/task.py +++ b/django_tasks/task.py @@ -32,6 +32,7 @@ MIN_PRIORITY = -100 MAX_PRIORITY = 100 DEFAULT_PRIORITY = 0 +DEFAULT_TIMEOUT = 600 # 10 minutes TASK_REFRESH_ATTRS = { "_exception_class", @@ -78,6 +79,9 @@ class Task(Generic[P, T]): immediately, or whatever the backend decides """ + timeout: int = DEFAULT_TIMEOUT + """The maximum duration the task can take to execute before being aborted""" + def __post_init__(self) -> None: self.get_backend().validate_task(self) @@ -95,6 +99,7 @@ def using( queue_name: Optional[str] = None, run_after: Optional[Union[datetime, timedelta]] = None, backend: Optional[str] = None, + timeout: Optional[int] = None, ) -> Self: """ Create a new task with modified defaults @@ -110,6 +115,8 @@ def using( changes["run_after"] = run_after if backend is not None: changes["backend"] = backend + if timeout is not None: + changes["timeout"] = timeout return replace(self, **changes) @@ -188,6 +195,7 @@ def task( queue_name: str = DEFAULT_QUEUE_NAME, backend: str = DEFAULT_TASK_BACKEND_ALIAS, enqueue_on_commit: Optional[bool] = None, + timeout: int = DEFAULT_TIMEOUT, ) -> Callable[[Callable[P, T]], Task[P, T]]: ... @@ -199,6 +207,7 @@ def task( queue_name: str = DEFAULT_QUEUE_NAME, backend: str = DEFAULT_TASK_BACKEND_ALIAS, enqueue_on_commit: Optional[bool] = None, + timeout: int = DEFAULT_TIMEOUT, ) -> Union[Task[P, T], Callable[[Callable[P, T]], Task[P, T]]]: """ A decorator used to create a task. @@ -212,6 +221,7 @@ def wrapper(f: Callable[P, T]) -> Task[P, T]: queue_name=queue_name, backend=backend, enqueue_on_commit=enqueue_on_commit, + timeout=timeout, ) if function: @@ -223,7 +233,7 @@ def wrapper(f: Callable[P, T]) -> Task[P, T]: @dataclass(frozen=True) class TaskResult(Generic[T]): task: Task - """The task for which this is a result""" + """The task for which this is a result, as it was run""" id: str """A unique identifier for the task result""" From 3253c63235f17c24679d8d4c71dfdb19c7638da8 Mon Sep 17 00:00:00 2001 From: Jake Howard Date: Sun, 16 Feb 2025 18:21:46 +0000 Subject: [PATCH 2/2] Add basic timeout helper It's not perfect, as it doesn't work if the block is outside Python. --- django_tasks/exceptions.py | 6 ++++++ django_tasks/utils.py | 44 +++++++++++++++++++++++++++++++++++++- tests/tests/test_utils.py | 9 ++++++++ 3 files changed, 58 insertions(+), 1 deletion(-) diff --git a/django_tasks/exceptions.py b/django_tasks/exceptions.py index 0e59eb7..d9f6cae 100644 --- a/django_tasks/exceptions.py +++ b/django_tasks/exceptions.py @@ -13,3 +13,9 @@ class InvalidTaskBackendError(ImproperlyConfigured): class ResultDoesNotExist(ObjectDoesNotExist): pass + + +class TimeoutException(BaseException): + """ + Something timed out. + """ diff --git a/django_tasks/utils.py b/django_tasks/utils.py index b8663c3..a1e20fb 100644 --- a/django_tasks/utils.py +++ b/django_tasks/utils.py @@ -1,14 +1,19 @@ +import ctypes import inspect import json import random import time +from contextlib import contextmanager from functools import wraps +from threading import Timer, current_thread from traceback import format_exception -from typing import Any, Callable, TypeVar +from typing import Any, Callable, Iterator, TypeVar from django.utils.crypto import RANDOM_STRING_CHARS from typing_extensions import ParamSpec +from .exceptions import TimeoutException + T = TypeVar("T") P = ParamSpec("P") @@ -74,3 +79,40 @@ def get_random_id() -> str: it's not cryptographically secure. """ return "".join(random.choices(RANDOM_STRING_CHARS, k=32)) + + +def _do_timeout(tid: int) -> None: + """ + Raise `TimeoutException` in the given thread. + + Here be dragons. + """ + # Since we're in Python, no GIL lock is needed + ret = ctypes.pythonapi.PyThreadState_SetAsyncExc( + ctypes.c_ulong(tid), ctypes.py_object(TimeoutException) + ) + + if ret == 0: + raise RuntimeError("Timeout failed - thread not found") + elif ret != 1: + ret = ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_ulong(tid), None) + raise RuntimeError("Timeout failed") + + +@contextmanager +def timeout(timeout: float) -> Iterator[Timer]: + """ + Run the wrapped code for at most `timeout` seconds before aborting. + + This works by starting a timer thread, and using "magic" raises an exception + in the main process after the timer expires. + + Raises `TimeoutException` when the timeout occurs. + """ + timeout_timer = Timer(timeout, _do_timeout, args=[current_thread().ident]) + try: + timeout_timer.start() + + yield timeout_timer + finally: + timeout_timer.cancel() diff --git a/tests/tests/test_utils.py b/tests/tests/test_utils.py index 48ba237..bf78d42 100644 --- a/tests/tests/test_utils.py +++ b/tests/tests/test_utils.py @@ -1,10 +1,12 @@ import datetime import subprocess +import time from unittest.mock import Mock from django.test import SimpleTestCase from django_tasks import utils +from django_tasks.exceptions import TimeoutException from tests import tasks as test_tasks @@ -116,3 +118,10 @@ def test_complex_exception(self) -> None: self.assertIn("KeyError: datetime.datetime", traceback) else: self.fail("KeyError not raised") + + +class TimeoutTestCase(SimpleTestCase): + def test_sleep_timeout(self) -> None: + with self.assertRaises(TimeoutException): + with utils.timeout(0.25): + time.sleep(0.5)