Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 32 additions & 30 deletions base_bg/models/bg_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,20 +171,18 @@ def run(self):
Executes the job
"""
self.ensure_one()
if self.state != "enqueued":
raise UserError(_("Only enqueued jobs can be executed"))
if self.state != "running":
raise UserError(_("Only running jobs can be executed"))

self.start()
self.env.cr.commit() # pylint: disable=invalid-commit

try:
context = self.context_json or {}
context = dict(self.context_json or {})
context.update({"bg_job": True, "bg_job_id": self.id})

# Extract record IDs if present in kwargs or args
model = self.env[self.model]
args = self.args_json or []
kwargs = self.kwargs_json or {}
args = list(self.args_json or [])
kwargs = dict(self.kwargs_json or {})
record_ids = kwargs.pop("_record_ids", [])
records = model.browse(record_ids).with_context(**context).with_user(self.create_uid)

Expand All @@ -193,11 +191,12 @@ def run(self):
self.finish()
if result:
self._notify_user(result)
self.env.cr.commit() # pylint: disable=invalid-commit

self.env.cr.commit() # pylint: disable=invalid-commit
except Exception as e:
self.env.cr.rollback() # pylint: disable=invalid-commit
self._handle_job_error(e)
raise
self.env.cr.commit() # pylint: disable=invalid-commit

def enqueue(self, retry: bool = False):
"""Mark the job as enqueued."""
Expand All @@ -213,15 +212,6 @@ def enqueue(self, retry: bool = False):
)
self.write(data)

def start(self):
"""Mark the job as running and set the start time."""
self.write(
{
"state": "running",
"start_time": fields.Datetime.now(),
}
)

def finish(self):
"""
Mark the job as done and set the end time.
Expand Down Expand Up @@ -288,7 +278,10 @@ def _notify_user(self, result: str):
:param result: The result of the job execution
"""
channel = (
self.env["discuss.channel"].with_user(self.create_uid)._get_or_create_chat([self.create_uid.partner_id.id])
self.env["discuss.channel"]
.sudo()
.with_user(self.create_uid)
._get_or_create_chat([self.create_uid.partner_id.id])
)
partner_root_id = self.env.ref("base.partner_root").id
channel.message_post(
Expand All @@ -315,23 +308,32 @@ def _get_next_jobs(self) -> "BgJob":
@api.model
def _get_next_job(self) -> "BgJob":
"""
Get and lock the next available enqueued job atomically.
Uses SELECT FOR UPDATE SKIP LOCKED to safely acquire a job without race conditions.
Retrieve the next enqueued job using optimistic locking.
Uses SELECT FOR UPDATE SKIP LOCKED to avoid conflicts with other cron jobs.
Not only grabs the next job, but also marks it as running.

:return: Returns the locked job or an empty recordset.
:return: The next BgJob record to process, or an empty recordset if none available
"""
self.env.cr.execute(
"""
SELECT id
FROM bg_job
WHERE state = 'enqueued'
ORDER BY priority ASC, create_date ASC
FOR UPDATE SKIP LOCKED
LIMIT 1;
WITH candidate AS (
SELECT id
FROM bg_job
WHERE state = 'enqueued'
ORDER BY priority, create_date, id
FOR UPDATE SKIP LOCKED
LIMIT 1
)
UPDATE bg_job j
SET state = 'running',
start_time = NOW()
FROM candidate
WHERE j.id = candidate.id
RETURNING j.id;
"""
)
result = self.env.cr.fetchone()
return self.browse(result[0]) if result else self.env["bg.job"]
row = self.env.cr.fetchone()
return self.browse(row[0]) if row else self.env["bg.job"]

@api.model
def _cron_run_enqueued_jobs(self):
Expand Down
4 changes: 3 additions & 1 deletion base_bg/tests/test_bg_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ def test_run_notifies_only_for_truthy_results(self):
name="Runner Job",
model="bg.job",
method="dummy_success_method",
state="running",
kwargs_json={"_record_ids": [target_job.id]},
)

Expand Down Expand Up @@ -195,6 +196,7 @@ def test_run_skip_notification_for_falsy_results(self):
name="Skip Notify Runner",
model="bg.job",
method="dummy_false_method",
state="running",
kwargs_json={"_record_ids": [target_job.id]},
)

Expand Down Expand Up @@ -367,7 +369,7 @@ def test_job_completion_enqueues_next_job(self):
job1 = self._create_job(
name="First Job",
batch_key=batch_key,
state="enqueued",
state="running",
kwargs_json={"_record_ids": [partner.id]},
)
job2 = self._create_job(
Expand Down