From 784c4b7aa789559c162c31880e2178ecf99fbccf Mon Sep 17 00:00:00 2001 From: Jonatron Date: Mon, 28 Apr 2025 17:16:24 +0100 Subject: [PATCH 1/3] Add DB indexes and make run_after not null --- ...016_alter_dbtaskresult_options_and_more.py | 45 +++++++++++++++++++ django_tasks/backends/database/models.py | 33 +++++++++++--- tests/tests/test_database_backend.py | 32 +++++++++++++ 3 files changed, 105 insertions(+), 5 deletions(-) create mode 100644 django_tasks/backends/database/migrations/0016_alter_dbtaskresult_options_and_more.py diff --git a/django_tasks/backends/database/migrations/0016_alter_dbtaskresult_options_and_more.py b/django_tasks/backends/database/migrations/0016_alter_dbtaskresult_options_and_more.py new file mode 100644 index 0000000..ead2fea --- /dev/null +++ b/django_tasks/backends/database/migrations/0016_alter_dbtaskresult_options_and_more.py @@ -0,0 +1,45 @@ +# Generated by Django 5.1.4 on 2025-04-28 11:28 + +import datetime +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('django_tasks_database', '0015_correctly_order_run_after'), + ] + + operations = [ + migrations.AlterModelOptions( + name='dbtaskresult', + options={'ordering': [models.OrderBy(models.F('priority'), descending=True), models.OrderBy(models.F('run_after'))], 'verbose_name': 'Task Result', 'verbose_name_plural': 'Task Results'}, + ), + migrations.AlterField( + model_name='dbtaskresult', + name='backend_name', + field=models.CharField(max_length=32, verbose_name='backend name'), + ), + migrations.AlterField( + model_name='dbtaskresult', + name='queue_name', + field=models.CharField(default='default', max_length=32, verbose_name='queue name'), + ), + migrations.AlterField( + model_name='dbtaskresult', + name='run_after', + field=models.DateTimeField(default=datetime.datetime(9999, 1, 1, 0, 0, tzinfo=datetime.timezone.utc), verbose_name='run after'), + ), + migrations.AddIndex( + model_name='dbtaskresult', + index=models.Index(models.F('status'), models.OrderBy(models.F('priority'), descending=True), models.OrderBy(models.F('run_after')), condition=models.Q(('status', 'NEW')), name='django_task_new_ordering_idx'), + ), + migrations.AddIndex( + model_name='dbtaskresult', + index=models.Index(fields=['queue_name'], name='django_task_queue_n_99c321_idx'), + ), + migrations.AddIndex( + model_name='dbtaskresult', + index=models.Index(fields=['backend_name'], name='django_task_backend_071754_idx'), + ), + ] diff --git a/django_tasks/backends/database/models.py b/django_tasks/backends/database/models.py index cca8701..e8c0632 100644 --- a/django_tasks/backends/database/models.py +++ b/django_tasks/backends/database/models.py @@ -1,3 +1,4 @@ +import datetime import logging import uuid from typing import TYPE_CHECKING, Any, Generic, Optional, TypeVar @@ -46,6 +47,9 @@ def __class_getitem__(cls, _): return cls +DATE_MAX = datetime.datetime(9999, 1, 1, tzinfo=datetime.timezone.utc) + + class DBTaskResultQuerySet(models.QuerySet): def ready(self) -> "DBTaskResultQuerySet": """ @@ -53,7 +57,7 @@ def ready(self) -> "DBTaskResultQuerySet": """ return self.filter( status=ResultStatus.NEW, - ).filter(models.Q(run_after=None) | models.Q(run_after__lte=timezone.now())) + ).filter(models.Q(run_after=DATE_MAX) | models.Q(run_after__lte=timezone.now())) def succeeded(self) -> "DBTaskResultQuerySet": return self.filter(status=ResultStatus.SUCCEEDED) @@ -95,10 +99,12 @@ class DBTaskResult(GenericBase[P, T], models.Model): task_path = models.TextField(_("task path")) - queue_name = models.TextField(_("queue name"), default=DEFAULT_QUEUE_NAME) - backend_name = models.TextField(_("backend name")) + queue_name = models.CharField( + _("queue name"), default=DEFAULT_QUEUE_NAME, max_length=32 + ) + backend_name = models.CharField(_("backend name"), max_length=32) - run_after = models.DateTimeField(_("run after"), null=True) + run_after = models.DateTimeField(_("run after"), default=DATE_MAX) return_value = models.JSONField(_("return value"), default=None, null=True) @@ -108,9 +114,18 @@ class DBTaskResult(GenericBase[P, T], models.Model): objects = DBTaskResultQuerySet.as_manager() class Meta: - ordering = [F("priority").desc(), F("run_after").asc(nulls_last=True)] + ordering = [F("priority").desc(), F("run_after").asc()] verbose_name = _("Task Result") verbose_name_plural = _("Task Results") + indexes = [ + models.Index( + "status", *ordering, + name="django_task_new_ordering_idx", + condition=Q(status=ResultStatus.NEW), + ), + models.Index(fields=["queue_name"]), + models.Index(fields=["backend_name"]), + ] if django.VERSION >= (5, 1): constraints = [ @@ -127,6 +142,11 @@ class Meta: ) ] + def save(self, **kwargs): + if self.run_after is None: + self.run_after = DATE_MAX + super().save(**kwargs) + @property def task(self) -> Task[P, T]: task = import_string(self.task_path) @@ -136,6 +156,9 @@ def task(self) -> Task[P, T]: f"Task {self.id} does not point to a Task ({self.task_path})" ) + if self.run_after == DATE_MAX: + self.run_after = None + return task.using( priority=self.priority, queue_name=self.queue_name, diff --git a/tests/tests/test_database_backend.py b/tests/tests/test_database_backend.py index 2fa841c..5b8fa52 100644 --- a/tests/tests/test_database_backend.py +++ b/tests/tests/test_database_backend.py @@ -387,6 +387,38 @@ def test_enqueue_logs(self) -> None: self.assertIn("enqueued", captured_logs.output[0]) self.assertIn(result.id, captured_logs.output[0]) + def test_index_scan_for_ready(self) -> None: + test_tasks.noop_task.enqueue() + db_task = DBTaskResult.objects.first() + new_db_tasks = [] + for _ in range(5000): + new_db_task = DBTaskResult() + for f in DBTaskResult._meta.fields: + if f.name != "id": + setattr(new_db_task, f.attname, getattr(db_task, f.attname)) + new_db_tasks.append(new_db_task) + DBTaskResult.objects.bulk_create(new_db_tasks) + + # Update query plan for certain databases + if connection.vendor == "postgresql": + with connection.cursor() as c: + c.execute(f"ANALYZE {DBTaskResult._meta.db_table};") + elif connection.vendor == "mysql": + with connection.cursor() as c: + c.execute(f"ANALYZE TABLE {DBTaskResult._meta.db_table};") + + plan = DBTaskResult.objects.ready().explain() + + if connection.vendor == "postgresql": + self.assertIn("django_task_new_ordering_idx", plan) + elif connection.vendor == "sqlite": + self.assertIn("USING INDEX django_task_new_ordering_idx", plan) + elif connection.vendor == "mysql": + self.assertIn("Index lookup", plan) + self.assertIn("using django_task_new_ordering_idx", plan) + else: + self.fail("Unknown database engine") + @override_settings( TASKS={ From 8dc8ebe53ecc501ba2c580166875b3a7564bdddd Mon Sep 17 00:00:00 2001 From: Jake Howard Date: Fri, 2 May 2025 13:16:13 +0100 Subject: [PATCH 2/3] Tweaks --- django_tasks/backends/database/apps.py | 3 + django_tasks/backends/database/backend.py | 2 +- ...016_alter_dbtaskresult_options_and_more.py | 65 +++++++++++++------ django_tasks/backends/database/models.py | 17 ++--- .../backends/database/signal_handlers.py | 12 ++++ tests/tests/test_database_backend.py | 15 ++--- 6 files changed, 72 insertions(+), 42 deletions(-) create mode 100644 django_tasks/backends/database/signal_handlers.py diff --git a/django_tasks/backends/database/apps.py b/django_tasks/backends/database/apps.py index d9de15a..d5bec65 100644 --- a/django_tasks/backends/database/apps.py +++ b/django_tasks/backends/database/apps.py @@ -5,3 +5,6 @@ class TasksAppConfig(AppConfig): name = "django_tasks.backends.database" label = "django_tasks_database" verbose_name = "Tasks Database Backend" + + def ready(self) -> None: + from . import signal_handlers # noqa diff --git a/django_tasks/backends/database/backend.py b/django_tasks/backends/database/backend.py index 89a5e53..3ae3064 100644 --- a/django_tasks/backends/database/backend.py +++ b/django_tasks/backends/database/backend.py @@ -44,7 +44,7 @@ def _task_to_db_task( priority=task.priority, task_path=task.module_path, queue_name=task.queue_name, - run_after=task.run_after, + run_after=task.run_after, # type: ignore[misc] backend_name=self.alias, ) diff --git a/django_tasks/backends/database/migrations/0016_alter_dbtaskresult_options_and_more.py b/django_tasks/backends/database/migrations/0016_alter_dbtaskresult_options_and_more.py index ead2fea..859e9f5 100644 --- a/django_tasks/backends/database/migrations/0016_alter_dbtaskresult_options_and_more.py +++ b/django_tasks/backends/database/migrations/0016_alter_dbtaskresult_options_and_more.py @@ -1,45 +1,70 @@ -# Generated by Django 5.1.4 on 2025-04-28 11:28 +# Generated by Django 5.1.5 on 2025-05-02 12:05 import datetime + from django.db import migrations, models class Migration(migrations.Migration): - dependencies = [ - ('django_tasks_database', '0015_correctly_order_run_after'), + ("django_tasks_database", "0015_correctly_order_run_after"), ] operations = [ migrations.AlterModelOptions( - name='dbtaskresult', - options={'ordering': [models.OrderBy(models.F('priority'), descending=True), models.OrderBy(models.F('run_after'))], 'verbose_name': 'Task Result', 'verbose_name_plural': 'Task Results'}, + name="dbtaskresult", + options={ + "ordering": [ + models.OrderBy(models.F("priority"), descending=True), + models.OrderBy(models.F("run_after")), + ], + "verbose_name": "Task Result", + "verbose_name_plural": "Task Results", + }, ), migrations.AlterField( - model_name='dbtaskresult', - name='backend_name', - field=models.CharField(max_length=32, verbose_name='backend name'), + model_name="dbtaskresult", + name="backend_name", + field=models.CharField(max_length=32, verbose_name="backend name"), ), migrations.AlterField( - model_name='dbtaskresult', - name='queue_name', - field=models.CharField(default='default', max_length=32, verbose_name='queue name'), + model_name="dbtaskresult", + name="queue_name", + field=models.CharField( + default="default", max_length=32, verbose_name="queue name" + ), ), migrations.AlterField( - model_name='dbtaskresult', - name='run_after', - field=models.DateTimeField(default=datetime.datetime(9999, 1, 1, 0, 0, tzinfo=datetime.timezone.utc), verbose_name='run after'), + model_name="dbtaskresult", + name="run_after", + field=models.DateTimeField( + default=datetime.datetime( + 9999, 12, 2, 5, 59, 59, 999999, tzinfo=datetime.timezone.utc + ), + verbose_name="run after", + ), + preserve_default=False, ), migrations.AddIndex( - model_name='dbtaskresult', - index=models.Index(models.F('status'), models.OrderBy(models.F('priority'), descending=True), models.OrderBy(models.F('run_after')), condition=models.Q(('status', 'NEW')), name='django_task_new_ordering_idx'), + model_name="dbtaskresult", + index=models.Index( + models.F("status"), + models.OrderBy(models.F("priority"), descending=True), + models.OrderBy(models.F("run_after")), + condition=models.Q(("status", "NEW")), + name="django_task_new_ordering_idx", + ), ), migrations.AddIndex( - model_name='dbtaskresult', - index=models.Index(fields=['queue_name'], name='django_task_queue_n_99c321_idx'), + model_name="dbtaskresult", + index=models.Index( + fields=["queue_name"], name="django_task_queue_n_99c321_idx" + ), ), migrations.AddIndex( - model_name='dbtaskresult', - index=models.Index(fields=['backend_name'], name='django_task_backend_071754_idx'), + model_name="dbtaskresult", + index=models.Index( + fields=["backend_name"], name="django_task_backend_071754_idx" + ), ), ] diff --git a/django_tasks/backends/database/models.py b/django_tasks/backends/database/models.py index e8c0632..ae8767c 100644 --- a/django_tasks/backends/database/models.py +++ b/django_tasks/backends/database/models.py @@ -47,7 +47,7 @@ def __class_getitem__(cls, _): return cls -DATE_MAX = datetime.datetime(9999, 1, 1, tzinfo=datetime.timezone.utc) +DATE_MAX = datetime.datetime.max.replace(day=1).astimezone(datetime.timezone.utc) class DBTaskResultQuerySet(models.QuerySet): @@ -104,7 +104,7 @@ class DBTaskResult(GenericBase[P, T], models.Model): ) backend_name = models.CharField(_("backend name"), max_length=32) - run_after = models.DateTimeField(_("run after"), default=DATE_MAX) + run_after = models.DateTimeField(_("run after")) return_value = models.JSONField(_("return value"), default=None, null=True) @@ -119,7 +119,8 @@ class Meta: verbose_name_plural = _("Task Results") indexes = [ models.Index( - "status", *ordering, + "status", + *ordering, name="django_task_new_ordering_idx", condition=Q(status=ResultStatus.NEW), ), @@ -142,11 +143,6 @@ class Meta: ) ] - def save(self, **kwargs): - if self.run_after is None: - self.run_after = DATE_MAX - super().save(**kwargs) - @property def task(self) -> Task[P, T]: task = import_string(self.task_path) @@ -156,13 +152,10 @@ def task(self) -> Task[P, T]: f"Task {self.id} does not point to a Task ({self.task_path})" ) - if self.run_after == DATE_MAX: - self.run_after = None - return task.using( priority=self.priority, queue_name=self.queue_name, - run_after=self.run_after, + run_after=None if self.run_after == DATE_MAX else self.run_after, backend=self.backend_name, ) diff --git a/django_tasks/backends/database/signal_handlers.py b/django_tasks/backends/database/signal_handlers.py new file mode 100644 index 0000000..09c0d3b --- /dev/null +++ b/django_tasks/backends/database/signal_handlers.py @@ -0,0 +1,12 @@ +from typing import Any + +from django.db.models.signals import pre_save +from django.dispatch import receiver + +from .models import DATE_MAX, DBTaskResult + + +@receiver(pre_save, sender=DBTaskResult) +def set_run_after(sender: Any, instance: DBTaskResult, **kwargs: Any) -> None: + if instance.run_after is None: + instance.run_after = DATE_MAX diff --git a/tests/tests/test_database_backend.py b/tests/tests/test_database_backend.py index 5b8fa52..b7ce330 100644 --- a/tests/tests/test_database_backend.py +++ b/tests/tests/test_database_backend.py @@ -1,3 +1,4 @@ +import copy import json import logging import os @@ -389,15 +390,11 @@ def test_enqueue_logs(self) -> None: def test_index_scan_for_ready(self) -> None: test_tasks.noop_task.enqueue() - db_task = DBTaskResult.objects.first() - new_db_tasks = [] - for _ in range(5000): - new_db_task = DBTaskResult() - for f in DBTaskResult._meta.fields: - if f.name != "id": - setattr(new_db_task, f.attname, getattr(db_task, f.attname)) - new_db_tasks.append(new_db_task) - DBTaskResult.objects.bulk_create(new_db_tasks) + + # Quickly duplicate tasks + db_task = DBTaskResult.objects.get() + db_task.id = None + DBTaskResult.objects.bulk_create([copy.copy(db_task) for _ in range(5000)]) # Update query plan for certain databases if connection.vendor == "postgresql": From 548a84240aa9496411d39674a245cee0e2d975e9 Mon Sep 17 00:00:00 2001 From: Jake Howard Date: Fri, 2 May 2025 14:49:03 +0100 Subject: [PATCH 3/3] Simplify `DATE_MAX` generation to keep Windows happy --- .../migrations/0016_alter_dbtaskresult_options_and_more.py | 4 ++-- django_tasks/backends/database/models.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/django_tasks/backends/database/migrations/0016_alter_dbtaskresult_options_and_more.py b/django_tasks/backends/database/migrations/0016_alter_dbtaskresult_options_and_more.py index 859e9f5..eefa19e 100644 --- a/django_tasks/backends/database/migrations/0016_alter_dbtaskresult_options_and_more.py +++ b/django_tasks/backends/database/migrations/0016_alter_dbtaskresult_options_and_more.py @@ -1,4 +1,4 @@ -# Generated by Django 5.1.5 on 2025-05-02 12:05 +# Generated by Django 5.2 on 2025-05-02 13:48 import datetime @@ -39,7 +39,7 @@ class Migration(migrations.Migration): name="run_after", field=models.DateTimeField( default=datetime.datetime( - 9999, 12, 2, 5, 59, 59, 999999, tzinfo=datetime.timezone.utc + 9999, 1, 1, 0, 0, tzinfo=datetime.timezone.utc ), verbose_name="run after", ), diff --git a/django_tasks/backends/database/models.py b/django_tasks/backends/database/models.py index ae8767c..59de8eb 100644 --- a/django_tasks/backends/database/models.py +++ b/django_tasks/backends/database/models.py @@ -47,7 +47,7 @@ def __class_getitem__(cls, _): return cls -DATE_MAX = datetime.datetime.max.replace(day=1).astimezone(datetime.timezone.utc) +DATE_MAX = datetime.datetime(9999, 1, 1, tzinfo=datetime.timezone.utc) class DBTaskResultQuerySet(models.QuerySet):