Skip to content

Commit c0b11e7

Browse files
jonatronRealOrangeOne
authored andcommitted
Add DB indexes and make run_after not null
1 parent a4cb083 commit c0b11e7

File tree

3 files changed

+105
-5
lines changed

3 files changed

+105
-5
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
# Generated by Django 5.1.4 on 2025-04-28 11:28
2+
3+
import datetime
4+
from django.db import migrations, models
5+
6+
7+
class Migration(migrations.Migration):
8+
9+
dependencies = [
10+
('django_tasks_database', '0015_correctly_order_run_after'),
11+
]
12+
13+
operations = [
14+
migrations.AlterModelOptions(
15+
name='dbtaskresult',
16+
options={'ordering': [models.OrderBy(models.F('priority'), descending=True), models.OrderBy(models.F('run_after'))], 'verbose_name': 'Task Result', 'verbose_name_plural': 'Task Results'},
17+
),
18+
migrations.AlterField(
19+
model_name='dbtaskresult',
20+
name='backend_name',
21+
field=models.CharField(max_length=32, verbose_name='backend name'),
22+
),
23+
migrations.AlterField(
24+
model_name='dbtaskresult',
25+
name='queue_name',
26+
field=models.CharField(default='default', max_length=32, verbose_name='queue name'),
27+
),
28+
migrations.AlterField(
29+
model_name='dbtaskresult',
30+
name='run_after',
31+
field=models.DateTimeField(default=datetime.datetime(9999, 1, 1, 0, 0, tzinfo=datetime.timezone.utc), verbose_name='run after'),
32+
),
33+
migrations.AddIndex(
34+
model_name='dbtaskresult',
35+
index=models.Index(models.F('status'), models.OrderBy(models.F('priority'), descending=True), models.OrderBy(models.F('run_after')), condition=models.Q(('status', 'NEW')), name='django_task_new_ordering_idx'),
36+
),
37+
migrations.AddIndex(
38+
model_name='dbtaskresult',
39+
index=models.Index(fields=['queue_name'], name='django_task_queue_n_99c321_idx'),
40+
),
41+
migrations.AddIndex(
42+
model_name='dbtaskresult',
43+
index=models.Index(fields=['backend_name'], name='django_task_backend_071754_idx'),
44+
),
45+
]

django_tasks/backends/database/models.py

+28-5
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import datetime
12
import logging
23
import uuid
34
from typing import TYPE_CHECKING, Any, Generic, Optional, TypeVar
@@ -46,14 +47,17 @@ def __class_getitem__(cls, _):
4647
return cls
4748

4849

50+
DATE_MAX = datetime.datetime(9999, 1, 1, tzinfo=datetime.timezone.utc)
51+
52+
4953
class DBTaskResultQuerySet(models.QuerySet):
5054
def ready(self) -> "DBTaskResultQuerySet":
5155
"""
5256
Return tasks which are ready to be processed.
5357
"""
5458
return self.filter(
5559
status=ResultStatus.NEW,
56-
).filter(models.Q(run_after=None) | models.Q(run_after__lte=timezone.now()))
60+
).filter(models.Q(run_after=DATE_MAX) | models.Q(run_after__lte=timezone.now()))
5761

5862
def succeeded(self) -> "DBTaskResultQuerySet":
5963
return self.filter(status=ResultStatus.SUCCEEDED)
@@ -95,10 +99,12 @@ class DBTaskResult(GenericBase[P, T], models.Model):
9599

96100
task_path = models.TextField(_("task path"))
97101

98-
queue_name = models.TextField(_("queue name"), default=DEFAULT_QUEUE_NAME)
99-
backend_name = models.TextField(_("backend name"))
102+
queue_name = models.CharField(
103+
_("queue name"), default=DEFAULT_QUEUE_NAME, max_length=32
104+
)
105+
backend_name = models.CharField(_("backend name"), max_length=32)
100106

101-
run_after = models.DateTimeField(_("run after"), null=True)
107+
run_after = models.DateTimeField(_("run after"), default=DATE_MAX)
102108

103109
return_value = models.JSONField(_("return value"), default=None, null=True)
104110

@@ -108,9 +114,18 @@ class DBTaskResult(GenericBase[P, T], models.Model):
108114
objects = DBTaskResultQuerySet.as_manager()
109115

110116
class Meta:
111-
ordering = [F("priority").desc(), F("run_after").asc(nulls_last=True)]
117+
ordering = [F("priority").desc(), F("run_after").asc()]
112118
verbose_name = _("Task Result")
113119
verbose_name_plural = _("Task Results")
120+
indexes = [
121+
models.Index(
122+
"status", *ordering,
123+
name="django_task_new_ordering_idx",
124+
condition=Q(status=ResultStatus.NEW),
125+
),
126+
models.Index(fields=["queue_name"]),
127+
models.Index(fields=["backend_name"]),
128+
]
114129

115130
if django.VERSION >= (5, 1):
116131
constraints = [
@@ -127,6 +142,11 @@ class Meta:
127142
)
128143
]
129144

145+
def save(self, **kwargs):
146+
if self.run_after is None:
147+
self.run_after = DATE_MAX
148+
super().save(**kwargs)
149+
130150
@property
131151
def task(self) -> Task[P, T]:
132152
task = import_string(self.task_path)
@@ -136,6 +156,9 @@ def task(self) -> Task[P, T]:
136156
f"Task {self.id} does not point to a Task ({self.task_path})"
137157
)
138158

159+
if self.run_after == DATE_MAX:
160+
self.run_after = None
161+
139162
return task.using(
140163
priority=self.priority,
141164
queue_name=self.queue_name,

tests/tests/test_database_backend.py

+32
Original file line numberDiff line numberDiff line change
@@ -387,6 +387,38 @@ def test_enqueue_logs(self) -> None:
387387
self.assertIn("enqueued", captured_logs.output[0])
388388
self.assertIn(result.id, captured_logs.output[0])
389389

390+
def test_index_scan_for_ready(self) -> None:
391+
test_tasks.noop_task.enqueue()
392+
db_task = DBTaskResult.objects.first()
393+
new_db_tasks = []
394+
for _ in range(5000):
395+
new_db_task = DBTaskResult()
396+
for f in DBTaskResult._meta.fields:
397+
if f.name != "id":
398+
setattr(new_db_task, f.attname, getattr(db_task, f.attname))
399+
new_db_tasks.append(new_db_task)
400+
DBTaskResult.objects.bulk_create(new_db_tasks)
401+
402+
# Update query plan for certain databases
403+
if connection.vendor == "postgresql":
404+
with connection.cursor() as c:
405+
c.execute(f"ANALYZE {DBTaskResult._meta.db_table};")
406+
elif connection.vendor == "mysql":
407+
with connection.cursor() as c:
408+
c.execute(f"ANALYZE TABLE {DBTaskResult._meta.db_table};")
409+
410+
plan = DBTaskResult.objects.ready().explain()
411+
412+
if connection.vendor == "postgresql":
413+
self.assertIn("django_task_new_ordering_idx", plan)
414+
elif connection.vendor == "sqlite":
415+
self.assertIn("USING INDEX django_task_new_ordering_idx", plan)
416+
elif connection.vendor == "mysql":
417+
self.assertIn("Index lookup", plan)
418+
self.assertIn("using django_task_new_ordering_idx", plan)
419+
else:
420+
self.fail("Unknown database engine")
421+
390422

391423
@override_settings(
392424
TASKS={

0 commit comments

Comments
 (0)