Skip to content

Add DB indexes #155

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions django_tasks/backends/database/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,6 @@ class TasksAppConfig(AppConfig):
name = "django_tasks.backends.database"
label = "django_tasks_database"
verbose_name = "Tasks Database Backend"

def ready(self) -> None:
from . import signal_handlers # noqa
2 changes: 1 addition & 1 deletion django_tasks/backends/database/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def _task_to_db_task(
priority=task.priority,
task_path=task.module_path,
queue_name=task.queue_name,
run_after=task.run_after,
run_after=task.run_after, # type: ignore[misc]
backend_name=self.alias,
)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# Generated by Django 5.2 on 2025-05-02 13:48

import datetime

from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("django_tasks_database", "0015_correctly_order_run_after"),
]

operations = [
migrations.AlterModelOptions(
name="dbtaskresult",
options={
"ordering": [
models.OrderBy(models.F("priority"), descending=True),
models.OrderBy(models.F("run_after")),
],
"verbose_name": "Task Result",
"verbose_name_plural": "Task Results",
},
),
migrations.AlterField(
model_name="dbtaskresult",
name="backend_name",
field=models.CharField(max_length=32, verbose_name="backend name"),
),
migrations.AlterField(
model_name="dbtaskresult",
name="queue_name",
field=models.CharField(
default="default", max_length=32, verbose_name="queue name"
),
),
migrations.AlterField(
model_name="dbtaskresult",
name="run_after",
field=models.DateTimeField(
default=datetime.datetime(
9999, 1, 1, 0, 0, tzinfo=datetime.timezone.utc
),
verbose_name="run after",
),
preserve_default=False,
),
migrations.AddIndex(
model_name="dbtaskresult",
index=models.Index(
models.F("status"),
models.OrderBy(models.F("priority"), descending=True),
models.OrderBy(models.F("run_after")),
condition=models.Q(("status", "NEW")),
name="django_task_new_ordering_idx",
),
),
migrations.AddIndex(
model_name="dbtaskresult",
index=models.Index(
fields=["queue_name"], name="django_task_queue_n_99c321_idx"
),
),
migrations.AddIndex(
model_name="dbtaskresult",
index=models.Index(
fields=["backend_name"], name="django_task_backend_071754_idx"
),
),
]
28 changes: 22 additions & 6 deletions django_tasks/backends/database/models.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime
import logging
import uuid
from typing import TYPE_CHECKING, Any, Generic, Optional, TypeVar
Expand Down Expand Up @@ -46,14 +47,17 @@ def __class_getitem__(cls, _):
return cls


DATE_MAX = datetime.datetime(9999, 1, 1, tzinfo=datetime.timezone.utc)


class DBTaskResultQuerySet(models.QuerySet):
def ready(self) -> "DBTaskResultQuerySet":
"""
Return tasks which are ready to be processed.
"""
return self.filter(
status=ResultStatus.NEW,
).filter(models.Q(run_after=None) | models.Q(run_after__lte=timezone.now()))
).filter(models.Q(run_after=DATE_MAX) | models.Q(run_after__lte=timezone.now()))

def succeeded(self) -> "DBTaskResultQuerySet":
return self.filter(status=ResultStatus.SUCCEEDED)
Expand Down Expand Up @@ -95,10 +99,12 @@ class DBTaskResult(GenericBase[P, T], models.Model):

task_path = models.TextField(_("task path"))

queue_name = models.TextField(_("queue name"), default=DEFAULT_QUEUE_NAME)
backend_name = models.TextField(_("backend name"))
queue_name = models.CharField(
_("queue name"), default=DEFAULT_QUEUE_NAME, max_length=32
)
backend_name = models.CharField(_("backend name"), max_length=32)

run_after = models.DateTimeField(_("run after"), null=True)
run_after = models.DateTimeField(_("run after"))

return_value = models.JSONField(_("return value"), default=None, null=True)

Expand All @@ -108,9 +114,19 @@ class DBTaskResult(GenericBase[P, T], models.Model):
objects = DBTaskResultQuerySet.as_manager()

class Meta:
ordering = [F("priority").desc(), F("run_after").asc(nulls_last=True)]
ordering = [F("priority").desc(), F("run_after").asc()]
verbose_name = _("Task Result")
verbose_name_plural = _("Task Results")
indexes = [
models.Index(
"status",
*ordering,
name="django_task_new_ordering_idx",
condition=Q(status=ResultStatus.NEW),
),
models.Index(fields=["queue_name"]),
models.Index(fields=["backend_name"]),
]

if django.VERSION >= (5, 1):
constraints = [
Expand Down Expand Up @@ -139,7 +155,7 @@ def task(self) -> Task[P, T]:
return task.using(
priority=self.priority,
queue_name=self.queue_name,
run_after=self.run_after,
run_after=None if self.run_after == DATE_MAX else self.run_after,
backend=self.backend_name,
)

Expand Down
12 changes: 12 additions & 0 deletions django_tasks/backends/database/signal_handlers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from typing import Any

from django.db.models.signals import pre_save
from django.dispatch import receiver

from .models import DATE_MAX, DBTaskResult


@receiver(pre_save, sender=DBTaskResult)
def set_run_after(sender: Any, instance: DBTaskResult, **kwargs: Any) -> None:
if instance.run_after is None:
instance.run_after = DATE_MAX
29 changes: 29 additions & 0 deletions tests/tests/test_database_backend.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import copy
import json
import logging
import os
Expand Down Expand Up @@ -387,6 +388,34 @@ def test_enqueue_logs(self) -> None:
self.assertIn("enqueued", captured_logs.output[0])
self.assertIn(result.id, captured_logs.output[0])

def test_index_scan_for_ready(self) -> None:
test_tasks.noop_task.enqueue()

# Quickly duplicate tasks
db_task = DBTaskResult.objects.get()
db_task.id = None
DBTaskResult.objects.bulk_create([copy.copy(db_task) for _ in range(5000)])

# Update query plan for certain databases
if connection.vendor == "postgresql":
with connection.cursor() as c:
c.execute(f"ANALYZE {DBTaskResult._meta.db_table};")
elif connection.vendor == "mysql":
with connection.cursor() as c:
c.execute(f"ANALYZE TABLE {DBTaskResult._meta.db_table};")

plan = DBTaskResult.objects.ready().explain()

if connection.vendor == "postgresql":
self.assertIn("django_task_new_ordering_idx", plan)
elif connection.vendor == "sqlite":
self.assertIn("USING INDEX django_task_new_ordering_idx", plan)
elif connection.vendor == "mysql":
self.assertIn("Index lookup", plan)
self.assertIn("using django_task_new_ordering_idx", plan)
else:
self.fail("Unknown database engine")


@override_settings(
TASKS={
Expand Down