Skip to content

Commit 6c67bf2

Browse files
Add DB indexes (#155)
Co-authored-by: Jonatron <[email protected]>
1 parent 31312ed commit 6c67bf2

File tree

6 files changed

+137
-7
lines changed

6 files changed

+137
-7
lines changed

django_tasks/backends/database/apps.py

+3
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,6 @@ class TasksAppConfig(AppConfig):
55
name = "django_tasks.backends.database"
66
label = "django_tasks_database"
77
verbose_name = "Tasks Database Backend"
8+
9+
def ready(self) -> None:
10+
from . import signal_handlers # noqa

django_tasks/backends/database/backend.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ def _task_to_db_task(
4444
priority=task.priority,
4545
task_path=task.module_path,
4646
queue_name=task.queue_name,
47-
run_after=task.run_after,
47+
run_after=task.run_after, # type: ignore[misc]
4848
backend_name=self.alias,
4949
)
5050

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
# Generated by Django 5.2 on 2025-05-02 13:48
2+
3+
import datetime
4+
5+
from django.db import migrations, models
6+
7+
8+
class Migration(migrations.Migration):
9+
dependencies = [
10+
("django_tasks_database", "0015_correctly_order_run_after"),
11+
]
12+
13+
operations = [
14+
migrations.AlterModelOptions(
15+
name="dbtaskresult",
16+
options={
17+
"ordering": [
18+
models.OrderBy(models.F("priority"), descending=True),
19+
models.OrderBy(models.F("run_after")),
20+
],
21+
"verbose_name": "Task Result",
22+
"verbose_name_plural": "Task Results",
23+
},
24+
),
25+
migrations.AlterField(
26+
model_name="dbtaskresult",
27+
name="backend_name",
28+
field=models.CharField(max_length=32, verbose_name="backend name"),
29+
),
30+
migrations.AlterField(
31+
model_name="dbtaskresult",
32+
name="queue_name",
33+
field=models.CharField(
34+
default="default", max_length=32, verbose_name="queue name"
35+
),
36+
),
37+
migrations.AlterField(
38+
model_name="dbtaskresult",
39+
name="run_after",
40+
field=models.DateTimeField(
41+
default=datetime.datetime(
42+
9999, 1, 1, 0, 0, tzinfo=datetime.timezone.utc
43+
),
44+
verbose_name="run after",
45+
),
46+
preserve_default=False,
47+
),
48+
migrations.AddIndex(
49+
model_name="dbtaskresult",
50+
index=models.Index(
51+
models.F("status"),
52+
models.OrderBy(models.F("priority"), descending=True),
53+
models.OrderBy(models.F("run_after")),
54+
condition=models.Q(("status", "NEW")),
55+
name="django_task_new_ordering_idx",
56+
),
57+
),
58+
migrations.AddIndex(
59+
model_name="dbtaskresult",
60+
index=models.Index(
61+
fields=["queue_name"], name="django_task_queue_n_99c321_idx"
62+
),
63+
),
64+
migrations.AddIndex(
65+
model_name="dbtaskresult",
66+
index=models.Index(
67+
fields=["backend_name"], name="django_task_backend_071754_idx"
68+
),
69+
),
70+
]

django_tasks/backends/database/models.py

+22-6
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import datetime
12
import logging
23
import uuid
34
from typing import TYPE_CHECKING, Any, Generic, Optional, TypeVar
@@ -46,14 +47,17 @@ def __class_getitem__(cls, _):
4647
return cls
4748

4849

50+
DATE_MAX = datetime.datetime(9999, 1, 1, tzinfo=datetime.timezone.utc)
51+
52+
4953
class DBTaskResultQuerySet(models.QuerySet):
5054
def ready(self) -> "DBTaskResultQuerySet":
5155
"""
5256
Return tasks which are ready to be processed.
5357
"""
5458
return self.filter(
5559
status=ResultStatus.NEW,
56-
).filter(models.Q(run_after=None) | models.Q(run_after__lte=timezone.now()))
60+
).filter(models.Q(run_after=DATE_MAX) | models.Q(run_after__lte=timezone.now()))
5761

5862
def succeeded(self) -> "DBTaskResultQuerySet":
5963
return self.filter(status=ResultStatus.SUCCEEDED)
@@ -95,10 +99,12 @@ class DBTaskResult(GenericBase[P, T], models.Model):
9599

96100
task_path = models.TextField(_("task path"))
97101

98-
queue_name = models.TextField(_("queue name"), default=DEFAULT_QUEUE_NAME)
99-
backend_name = models.TextField(_("backend name"))
102+
queue_name = models.CharField(
103+
_("queue name"), default=DEFAULT_QUEUE_NAME, max_length=32
104+
)
105+
backend_name = models.CharField(_("backend name"), max_length=32)
100106

101-
run_after = models.DateTimeField(_("run after"), null=True)
107+
run_after = models.DateTimeField(_("run after"))
102108

103109
return_value = models.JSONField(_("return value"), default=None, null=True)
104110

@@ -108,9 +114,19 @@ class DBTaskResult(GenericBase[P, T], models.Model):
108114
objects = DBTaskResultQuerySet.as_manager()
109115

110116
class Meta:
111-
ordering = [F("priority").desc(), F("run_after").asc(nulls_last=True)]
117+
ordering = [F("priority").desc(), F("run_after").asc()]
112118
verbose_name = _("Task Result")
113119
verbose_name_plural = _("Task Results")
120+
indexes = [
121+
models.Index(
122+
"status",
123+
*ordering,
124+
name="django_task_new_ordering_idx",
125+
condition=Q(status=ResultStatus.NEW),
126+
),
127+
models.Index(fields=["queue_name"]),
128+
models.Index(fields=["backend_name"]),
129+
]
114130

115131
if django.VERSION >= (5, 1):
116132
constraints = [
@@ -139,7 +155,7 @@ def task(self) -> Task[P, T]:
139155
return task.using(
140156
priority=self.priority,
141157
queue_name=self.queue_name,
142-
run_after=self.run_after,
158+
run_after=None if self.run_after == DATE_MAX else self.run_after,
143159
backend=self.backend_name,
144160
)
145161

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
from typing import Any
2+
3+
from django.db.models.signals import pre_save
4+
from django.dispatch import receiver
5+
6+
from .models import DATE_MAX, DBTaskResult
7+
8+
9+
@receiver(pre_save, sender=DBTaskResult)
10+
def set_run_after(sender: Any, instance: DBTaskResult, **kwargs: Any) -> None:
11+
if instance.run_after is None:
12+
instance.run_after = DATE_MAX

tests/tests/test_database_backend.py

+29
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import copy
12
import json
23
import logging
34
import os
@@ -387,6 +388,34 @@ def test_enqueue_logs(self) -> None:
387388
self.assertIn("enqueued", captured_logs.output[0])
388389
self.assertIn(result.id, captured_logs.output[0])
389390

391+
def test_index_scan_for_ready(self) -> None:
392+
test_tasks.noop_task.enqueue()
393+
394+
# Quickly duplicate tasks
395+
db_task = DBTaskResult.objects.get()
396+
db_task.id = None
397+
DBTaskResult.objects.bulk_create([copy.copy(db_task) for _ in range(5000)])
398+
399+
# Update query plan for certain databases
400+
if connection.vendor == "postgresql":
401+
with connection.cursor() as c:
402+
c.execute(f"ANALYZE {DBTaskResult._meta.db_table};")
403+
elif connection.vendor == "mysql":
404+
with connection.cursor() as c:
405+
c.execute(f"ANALYZE TABLE {DBTaskResult._meta.db_table};")
406+
407+
plan = DBTaskResult.objects.ready().explain()
408+
409+
if connection.vendor == "postgresql":
410+
self.assertIn("django_task_new_ordering_idx", plan)
411+
elif connection.vendor == "sqlite":
412+
self.assertIn("USING INDEX django_task_new_ordering_idx", plan)
413+
elif connection.vendor == "mysql":
414+
self.assertIn("Index lookup", plan)
415+
self.assertIn("using django_task_new_ordering_idx", plan)
416+
else:
417+
self.fail("Unknown database engine")
418+
390419

391420
@override_settings(
392421
TASKS={

0 commit comments

Comments
 (0)