From 6bbad8d6096b5331f28fbbb175e695d686c7ae5c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Bidoul?= Date: Sat, 22 Nov 2025 11:52:55 +0100 Subject: [PATCH 01/13] [DOC] queue_job: remove dead jobs caveat from roadmap The automatic dead jobs requeuer now works out of the box. --- queue_job/readme/ROADMAP.md | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/queue_job/readme/ROADMAP.md b/queue_job/readme/ROADMAP.md index a13be6beb3..df33142b88 100644 --- a/queue_job/readme/ROADMAP.md +++ b/queue_job/readme/ROADMAP.md @@ -1,17 +1,2 @@ - After creating a new database or installing `queue_job` on an existing database, Odoo must be restarted for the runner to detect it. -- When Odoo shuts down normally, it waits for running jobs to finish. - However, when the Odoo server crashes or is otherwise force-stopped, - running jobs are interrupted while the runner has no chance to know - they have been aborted. In such situations, jobs may remain in - `started` or `enqueued` state after the Odoo server is halted. Since - the runner has no way to know if they are actually running or not, and - does not know for sure if it is safe to restart the jobs, it does not - attempt to restart them automatically. Such stale jobs therefore fill - the running queue and prevent other jobs to start. You must therefore - requeue them manually, either from the Jobs view, or by running the - following SQL statement *before starting Odoo*: - -``` sql -update queue_job set state='pending' where state in ('started', 'enqueued') -``` From 6ef202f7bf7cd6a5ae426ce4b505a1233d9e2eaf Mon Sep 17 00:00:00 2001 From: Florent Xicluna Date: Mon, 2 Jun 2025 14:51:45 +0200 Subject: [PATCH 02/13] [IMP] queue_job: prevent invalid change of job status --- queue_job/models/queue_job.py | 17 ++++-- queue_job/tests/test_wizards.py | 57 ++++++++++++++++++++ queue_job/wizards/queue_jobs_to_cancelled.py | 6 +-- queue_job/wizards/queue_jobs_to_done.py | 2 + queue_job/wizards/queue_requeue_job.py | 1 + 5 files changed, 76 insertions(+), 7 deletions(-) diff --git a/queue_job/models/queue_job.py b/queue_job/models/queue_job.py index b1a5dcaf7b..283e1a1152 100644 --- a/queue_job/models/queue_job.py +++ b/queue_job/models/queue_job.py @@ -17,6 +17,7 @@ from ..job import ( CANCELLED, DONE, + ENQUEUED, FAILED, PENDING, STARTED, @@ -328,18 +329,26 @@ def _change_job_state(self, state, result=None): raise ValueError(msg) def button_done(self): + # If job was set to STARTED or CANCELLED, do not set it to DONE + states_from = (WAIT_DEPENDENCIES, PENDING, ENQUEUED, FAILED) result = self.env._("Manually set to done by %s", self.env.user.name) - self._change_job_state(DONE, result=result) + records = self.filtered(lambda job_: job_.state in states_from) + records._change_job_state(DONE, result=result) return True def button_cancelled(self): + # If job was set to DONE or WAIT_DEPENDENCIES, do not cancel it + states_from = (PENDING, ENQUEUED, FAILED) result = self.env._("Cancelled by %s", self.env.user.name) - self._change_job_state(CANCELLED, result=result) + records = self.filtered(lambda job_: job_.state in states_from) + records._change_job_state(CANCELLED, result=result) return True def requeue(self): - jobs_to_requeue = self.filtered(lambda job_: job_.state != WAIT_DEPENDENCIES) - jobs_to_requeue._change_job_state(PENDING) + # If job is already in queue or started, do not requeue it + states_from = (FAILED, DONE, CANCELLED) + records = self.filtered(lambda job_: job_.state in states_from) + records._change_job_state(PENDING) return True def _message_post_on_failure(self): diff --git a/queue_job/tests/test_wizards.py b/queue_job/tests/test_wizards.py index 2ac162d313..7738836d2f 100644 --- a/queue_job/tests/test_wizards.py +++ b/queue_job/tests/test_wizards.py @@ -46,3 +46,60 @@ def test_03_done(self): wizard = self._wizard("queue.jobs.to.done") wizard.set_done() self.assertEqual(self.job.state, "done") + + def test_04_requeue_forbidden(self): + wizard = self._wizard("queue.requeue.job") + + # State WAIT_DEPENDENCIES is not requeued + self.job.state = "wait_dependencies" + wizard.requeue() + self.assertEqual(self.job.state, "wait_dependencies") + + # State PENDING, ENQUEUED or STARTED are ignored too + for test_state in ("pending", "enqueued", "started"): + self.job.state = test_state + wizard.requeue() + self.assertEqual(self.job.state, test_state) + + # States CANCELLED, DONE or FAILED will change status + self.job.state = "cancelled" + wizard.requeue() + self.assertEqual(self.job.state, "pending") + + def test_05_cancel_forbidden(self): + wizard = self._wizard("queue.jobs.to.cancelled") + + # State WAIT_DEPENDENCIES is not cancelled + self.job.state = "wait_dependencies" + wizard.set_cancelled() + self.assertEqual(self.job.state, "wait_dependencies") + + # State DONE is not cancelled + self.job.state = "done" + wizard.set_cancelled() + self.assertEqual(self.job.state, "done") + + # State PENDING, ENQUEUED or FAILED will be cancelled + for test_state in ("pending", "enqueued"): + self.job.state = test_state + wizard.set_cancelled() + self.assertEqual(self.job.state, "cancelled") + + def test_06_done_forbidden(self): + wizard = self._wizard("queue.jobs.to.done") + + # State STARTED is not set DONE manually + self.job.state = "started" + wizard.set_done() + self.assertEqual(self.job.state, "started") + + # State CANCELLED is not cancelled + self.job.state = "cancelled" + wizard.set_done() + self.assertEqual(self.job.state, "cancelled") + + # State WAIT_DEPENDENCIES, PENDING, ENQUEUED or FAILED will be set to DONE + for test_state in ("wait_dependencies", "pending", "enqueued"): + self.job.state = test_state + wizard.set_done() + self.assertEqual(self.job.state, "done") diff --git a/queue_job/wizards/queue_jobs_to_cancelled.py b/queue_job/wizards/queue_jobs_to_cancelled.py index 9e73374ebd..bb9f831576 100644 --- a/queue_job/wizards/queue_jobs_to_cancelled.py +++ b/queue_job/wizards/queue_jobs_to_cancelled.py @@ -10,8 +10,8 @@ class SetJobsToCancelled(models.TransientModel): _description = "Cancel all selected jobs" def set_cancelled(self): - jobs = self.job_ids.filtered( - lambda x: x.state in ("pending", "failed", "enqueued") - ) + # Only jobs with state PENDING, FAILED, ENQUEUED + # will change to CANCELLED + jobs = self.job_ids jobs.button_cancelled() return {"type": "ir.actions.act_window_close"} diff --git a/queue_job/wizards/queue_jobs_to_done.py b/queue_job/wizards/queue_jobs_to_done.py index ff1366ffed..caf8129213 100644 --- a/queue_job/wizards/queue_jobs_to_done.py +++ b/queue_job/wizards/queue_jobs_to_done.py @@ -10,6 +10,8 @@ class SetJobsToDone(models.TransientModel): _description = "Set all selected jobs to done" def set_done(self): + # Only jobs with state WAIT_DEPENDENCIES, PENDING, ENQUEUED or FAILED + # will change to DONE jobs = self.job_ids jobs.button_done() return {"type": "ir.actions.act_window_close"} diff --git a/queue_job/wizards/queue_requeue_job.py b/queue_job/wizards/queue_requeue_job.py index 67d2ffcbdc..a88256300f 100644 --- a/queue_job/wizards/queue_requeue_job.py +++ b/queue_job/wizards/queue_requeue_job.py @@ -20,6 +20,7 @@ def _default_job_ids(self): ) def requeue(self): + # Only jobs with state FAILED, DONE or CANCELLED will change to PENDING jobs = self.job_ids jobs.requeue() return {"type": "ir.actions.act_window_close"} From 4ab5fbb96fb03fb5ea7aa5e282ec1198939e5e1f Mon Sep 17 00:00:00 2001 From: hoangtrann Date: Sat, 22 Nov 2025 06:05:08 +0700 Subject: [PATCH 03/13] [IMP] queue_job: requeue orphaned jobs --- queue_job/jobrunner/runner.py | 37 +++++++++++++++++++ test_queue_job/tests/test_requeue_dead_job.py | 22 +++++++++++ 2 files changed, 59 insertions(+) diff --git a/queue_job/jobrunner/runner.py b/queue_job/jobrunner/runner.py index b8cfe979cc..0f426a7151 100644 --- a/queue_job/jobrunner/runner.py +++ b/queue_job/jobrunner/runner.py @@ -378,6 +378,35 @@ def _query_requeue_dead_jobs(self): RETURNING uuid """ + def _query_requeue_orphaned_jobs(self): + """Query to requeue jobs stuck in 'enqueued' state without a lock. + + This handles the edge case where the runner marks a job as 'enqueued' + but the HTTP request to start the job never reaches the Odoo server + (e.g., due to server shutdown/crash between setting enqueued and + the controller receiving the request). These jobs have no lock record + because set_started() was never called, so they are invisible to + _query_requeue_dead_jobs(). + """ + return """ + UPDATE + queue_job + SET + state='pending' + WHERE + state = 'enqueued' + AND date_enqueued < (now() AT TIME ZONE 'utc' - INTERVAL '10 sec') + AND NOT EXISTS ( + SELECT + 1 + FROM + queue_job_lock + WHERE + queue_job_id = queue_job.id + ) + RETURNING uuid + """ + def requeue_dead_jobs(self): """ Set started and enqueued jobs but not locked to pending @@ -406,6 +435,14 @@ def requeue_dead_jobs(self): for (uuid,) in cr.fetchall(): _logger.warning("Re-queued dead job with uuid: %s", uuid) + # Requeue orphaned jobs (enqueued but never started, no lock) + query = self._query_requeue_orphaned_jobs() + cr.execute(query) + for (uuid,) in cr.fetchall(): + _logger.warning( + "Re-queued orphaned job (enqueued without lock) with uuid: %s", uuid + ) + class QueueJobRunner: def __init__( diff --git a/test_queue_job/tests/test_requeue_dead_job.py b/test_queue_job/tests/test_requeue_dead_job.py index a6328fed76..c90feaa97f 100644 --- a/test_queue_job/tests/test_requeue_dead_job.py +++ b/test_queue_job/tests/test_requeue_dead_job.py @@ -99,3 +99,25 @@ def test_requeue_dead_jobs(self): uuids_requeued = self.env.cr.fetchall() self.assertTrue(queue_job.uuid in j[0] for j in uuids_requeued) + + def test_requeue_orphaned_jobs(self): + queue_job = self._get_demo_job("test_enqueued_job") + job_obj = Job.load(self.env, queue_job.uuid) + + # Only enqueued job, don't set it to started to simulate the scenario + # that system shutdown before job is starting + job_obj.set_enqueued() + job_obj.date_enqueued = datetime.now() - timedelta(minutes=1) + job_obj.store() + + # job ins't actually picked up by the first requeue attempt + query = Database(self.env.cr.dbname)._query_requeue_dead_jobs() + self.env.cr.execute(query) + uuids_requeued = self.env.cr.fetchall() + self.assertFalse(uuids_requeued) + + # job is picked up by the 2nd requeue attempt + query = Database(self.env.cr.dbname)._query_requeue_orphaned_jobs() + self.env.cr.execute(query) + uuids_requeued = self.env.cr.fetchall() + self.assertTrue(queue_job.uuid in j[0] for j in uuids_requeued) From d1bbd0c28621df5beca200c339f71c6784f3d52c Mon Sep 17 00:00:00 2001 From: hoangtrann Date: Wed, 31 Dec 2025 19:27:16 +0700 Subject: [PATCH 04/13] [IMP] queue_job: query orphaned dead job not exist in lock table --- queue_job/jobrunner/runner.py | 78 ++++++------------- test_queue_job/tests/test_requeue_dead_job.py | 8 +- 2 files changed, 26 insertions(+), 60 deletions(-) diff --git a/queue_job/jobrunner/runner.py b/queue_job/jobrunner/runner.py index 0f426a7151..dfcd9db040 100644 --- a/queue_job/jobrunner/runner.py +++ b/queue_job/jobrunner/runner.py @@ -357,52 +357,26 @@ def _query_requeue_dead_jobs(self): ELSE exc_info END) WHERE - id in ( - SELECT - queue_job_id - FROM - queue_job_lock - WHERE - queue_job_id in ( - SELECT - id - FROM - queue_job - WHERE - state IN ('enqueued','started') - AND date_enqueued < - (now() AT TIME ZONE 'utc' - INTERVAL '10 sec') - ) - FOR UPDATE SKIP LOCKED - ) - RETURNING uuid - """ - - def _query_requeue_orphaned_jobs(self): - """Query to requeue jobs stuck in 'enqueued' state without a lock. - - This handles the edge case where the runner marks a job as 'enqueued' - but the HTTP request to start the job never reaches the Odoo server - (e.g., due to server shutdown/crash between setting enqueued and - the controller receiving the request). These jobs have no lock record - because set_started() was never called, so they are invisible to - _query_requeue_dead_jobs(). - """ - return """ - UPDATE - queue_job - SET - state='pending' - WHERE - state = 'enqueued' + state IN ('enqueued','started') AND date_enqueued < (now() AT TIME ZONE 'utc' - INTERVAL '10 sec') - AND NOT EXISTS ( - SELECT - 1 - FROM - queue_job_lock - WHERE - queue_job_id = queue_job.id + AND ( + id in ( + SELECT + queue_job_id + FROM + queue_job_lock + WHERE + queue_job_lock.queue_job_id = queue_job.id + FOR UPDATE SKIP LOCKED + ) + OR NOT EXISTS ( + SELECT + 1 + FROM + queue_job_lock + WHERE + queue_job_lock.queue_job_id = queue_job.id + ) ) RETURNING uuid """ @@ -425,6 +399,12 @@ def requeue_dead_jobs(self): However, when the Odoo server crashes or is otherwise force-stopped, running jobs are interrupted while the runner has no chance to know they have been aborted. + + This also handles orphaned jobs (enqueued but never started, no lock). + This edge case occurs when the runner marks a job as 'enqueued' + but the HTTP request to start the job never reaches the Odoo server + (e.g., due to server shutdown/crash between setting enqueued and + the controller receiving the request). """ with closing(self.conn.cursor()) as cr: @@ -435,14 +415,6 @@ def requeue_dead_jobs(self): for (uuid,) in cr.fetchall(): _logger.warning("Re-queued dead job with uuid: %s", uuid) - # Requeue orphaned jobs (enqueued but never started, no lock) - query = self._query_requeue_orphaned_jobs() - cr.execute(query) - for (uuid,) in cr.fetchall(): - _logger.warning( - "Re-queued orphaned job (enqueued without lock) with uuid: %s", uuid - ) - class QueueJobRunner: def __init__( diff --git a/test_queue_job/tests/test_requeue_dead_job.py b/test_queue_job/tests/test_requeue_dead_job.py index c90feaa97f..58890adf24 100644 --- a/test_queue_job/tests/test_requeue_dead_job.py +++ b/test_queue_job/tests/test_requeue_dead_job.py @@ -110,14 +110,8 @@ def test_requeue_orphaned_jobs(self): job_obj.date_enqueued = datetime.now() - timedelta(minutes=1) job_obj.store() - # job ins't actually picked up by the first requeue attempt + # job is now picked up by the requeue query (which includes orphaned jobs) query = Database(self.env.cr.dbname)._query_requeue_dead_jobs() self.env.cr.execute(query) uuids_requeued = self.env.cr.fetchall() - self.assertFalse(uuids_requeued) - - # job is picked up by the 2nd requeue attempt - query = Database(self.env.cr.dbname)._query_requeue_orphaned_jobs() - self.env.cr.execute(query) - uuids_requeued = self.env.cr.fetchall() self.assertTrue(queue_job.uuid in j[0] for j in uuids_requeued) From 3eaa8a8d8b4d006c35a06210890ad415e1a5ebc7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Bidoul?= Date: Fri, 2 Jan 2026 13:54:38 +0100 Subject: [PATCH 05/13] [IMP] queue_job: add job_duration parameter to test job This allows creating test job with a long duration for stress testing. --- queue_job/controllers/main.py | 15 +++++++++++++-- queue_job/models/queue_job.py | 5 ++++- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py index da0a21c701..9c4f802a86 100644 --- a/queue_job/controllers/main.py +++ b/queue_job/controllers/main.py @@ -177,6 +177,7 @@ def create_test_job( description="Test job", size=1, failure_rate=0, + job_duration=0, ): if not http.request.env.user.has_group("base.group_erp_manager"): raise Forbidden(http.request.env._("Access Denied")) @@ -187,6 +188,12 @@ def create_test_job( except (ValueError, TypeError): failure_rate = 0 + if job_duration is not None: + try: + job_duration = float(job_duration) + except (ValueError, TypeError): + job_duration = 0 + if not (0 <= failure_rate <= 1): raise BadRequest("failure_rate must be between 0 and 1") @@ -215,6 +222,7 @@ def create_test_job( channel=channel, description=description, failure_rate=failure_rate, + job_duration=job_duration, ) if size > 1: @@ -225,6 +233,7 @@ def create_test_job( channel=channel, description=description, failure_rate=failure_rate, + job_duration=job_duration, ) return "" @@ -236,6 +245,7 @@ def _create_single_test_job( description="Test job", size=1, failure_rate=0, + job_duration=0, ): delayed = ( http.request.env["queue.job"] @@ -245,7 +255,7 @@ def _create_single_test_job( channel=channel, description=description, ) - ._test_job(failure_rate=failure_rate) + ._test_job(failure_rate=failure_rate, job_duration=job_duration) ) return f"job uuid: {delayed.db_record().uuid}" @@ -259,6 +269,7 @@ def _create_graph_test_jobs( channel=None, description="Test job", failure_rate=0, + job_duration=0, ): model = http.request.env["queue.job"] current_count = 0 @@ -281,7 +292,7 @@ def _create_graph_test_jobs( max_retries=max_retries, channel=channel, description=f"{description} #{current_count}", - )._test_job(failure_rate=failure_rate) + )._test_job(failure_rate=failure_rate, job_duration=job_duration) ) grouping = random.choice(possible_grouping_methods) diff --git a/queue_job/models/queue_job.py b/queue_job/models/queue_job.py index 283e1a1152..ce5f3b912e 100644 --- a/queue_job/models/queue_job.py +++ b/queue_job/models/queue_job.py @@ -3,6 +3,7 @@ import logging import random +import time from datetime import datetime, timedelta from odoo import api, exceptions, fields, models @@ -456,7 +457,9 @@ def related_action_open_record(self): ) return action - def _test_job(self, failure_rate=0): + def _test_job(self, failure_rate=0, job_duration=0): _logger.info("Running test job.") if random.random() <= failure_rate: raise JobError("Job failed") + if job_duration: + time.sleep(job_duration) From 0ca476d6b198719f2a0def75de90b9d615406846 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Bidoul?= Date: Fri, 2 Jan 2026 15:00:27 +0100 Subject: [PATCH 06/13] [FIX] queue_job: fix retry mechanisme for job dependencies When a SerializationFailure occurs when updating the state of dependent jobs, the cursor is not usable anymore so the retry failed with `current transaction is aborted`. A savepoint fixes that. --- queue_job/controllers/main.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py index 9c4f802a86..84100ccce5 100644 --- a/queue_job/controllers/main.py +++ b/queue_job/controllers/main.py @@ -50,14 +50,15 @@ def _enqueue_dependent_jobs(self, env, job): tries = 0 while True: try: - job.enqueue_waiting() + with job.env.cr.savepoint(): + job.enqueue_waiting() except OperationalError as err: # Automatically retry the typical transaction serialization # errors if err.pgcode not in PG_CONCURRENCY_ERRORS_TO_RETRY: raise if tries >= DEPENDS_MAX_TRIES_ON_CONCURRENCY_FAILURE: - _logger.info( + _logger.error( "%s, maximum number of tries reached to update dependencies", errorcodes.lookup(err.pgcode), ) From ceabeb3bf4396189856f757c86acd54b61e21c5b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Bidoul?= Date: Fri, 2 Jan 2026 19:36:25 +0100 Subject: [PATCH 07/13] [FIX] queue_job: set exec_time readonly --- queue_job/models/queue_job.py | 1 + 1 file changed, 1 insertion(+) diff --git a/queue_job/models/queue_job.py b/queue_job/models/queue_job.py index ce5f3b912e..20eeaf02b3 100644 --- a/queue_job/models/queue_job.py +++ b/queue_job/models/queue_job.py @@ -103,6 +103,7 @@ class QueueJob(models.Model): date_done = fields.Datetime(readonly=True) exec_time = fields.Float( string="Execution Time (avg)", + readonly=True, aggregator="avg", help="Time required to execute this job in seconds. Average when grouped.", ) From e894ca94012d630064e2b48a752a417b8857b23f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Bidoul?= Date: Sat, 3 Jan 2026 11:34:18 +0100 Subject: [PATCH 08/13] [IMP] queue_job: use state constant in lock function --- queue_job/job.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/queue_job/job.py b/queue_job/job.py index 48a7561553..f5de1233f2 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -264,11 +264,11 @@ def lock(self): queue_job WHERE uuid = %s - AND state='started' + AND state = %s ) FOR UPDATE; """, - [self.uuid], + [self.uuid, STARTED], ) # 1 job should be locked From 2a2c209201554d5bb6ed9f44cbdd3d57243b3e11 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Bidoul?= Date: Fri, 2 Jan 2026 12:18:38 +0100 Subject: [PATCH 09/13] [IMP] queue_job: refactor job acquisition In this commit we cleanly separate the job acquisition (i.e. verifying the job is in the exepected state, marking it started and locking it) from job execution. We also avoid trying to start the job if it is already locked by using SKIP LOCKED and exiting early. Indeed in such situations the job is likely already being handled by another worker so there is no point trying to start it, so we exit early and let it be handled either by the other worker or the dead job requeuer. --- queue_job/controllers/main.py | 57 ++++++++++++------- queue_job/job.py | 19 +++---- test_queue_job/tests/__init__.py | 1 + test_queue_job/tests/common.py | 10 ++++ test_queue_job/tests/test_acquire_job.py | 51 +++++++++++++++++ test_queue_job/tests/test_requeue_dead_job.py | 17 ------ 6 files changed, 107 insertions(+), 48 deletions(-) create mode 100644 test_queue_job/tests/test_acquire_job.py diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py index 84100ccce5..b8012d14c1 100644 --- a/queue_job/controllers/main.py +++ b/queue_job/controllers/main.py @@ -27,15 +27,47 @@ class RunJobController(http.Controller): - def _try_perform_job(self, env, job): - """Try to perform the job.""" + @classmethod + def _acquire_job(cls, env: api.Environment, job_uuid: str) -> Job | None: + """Acquire a job for execution. + + - make sure it is in ENQUEUED state + - mark it as STARTED and commit the state change + - acquire the job lock + + If successful, return the Job instance, otherwise return None. This + function may fail to acquire the job is not in the expected state or is + already locked by another worker. + """ + env.cr.execute( + "SELECT uuid FROM queue_job WHERE uuid=%s AND state=%s " + "FOR UPDATE SKIP LOCKED", + (job_uuid, ENQUEUED), + ) + if not env.cr.fetchone(): + _logger.warning( + "was requested to run job %s, but it does not exist, " + "or is not in state %s, or is being handled by another worker", + job_uuid, + ENQUEUED, + ) + return None + job = Job.load(env, job_uuid) + assert job and job.state == ENQUEUED job.set_started() job.store() env.cr.commit() - job.lock() + if not job.lock(): + _logger.warning( + "was requested to run job %s, but it could not be locked", + job_uuid, + ) + return None + return job + def _try_perform_job(self, env, job): + """Try to perform the job, mark it done and commit if successful.""" _logger.debug("%s started", job) - job.perform() # Triggers any stored computed fields before calling 'set_done' # so that will be part of the 'exec_time' @@ -95,23 +127,10 @@ def retry_postpone(job, message, seconds=None): job.set_pending(reset_retry=False) job.store() - # ensure the job to run is in the correct state and lock the record - env.cr.execute( - "SELECT state FROM queue_job WHERE uuid=%s AND state=%s FOR UPDATE", - (job_uuid, ENQUEUED), - ) - if not env.cr.fetchone(): - _logger.warning( - "was requested to run job %s, but it does not exist, " - "or is not in state %s", - job_uuid, - ENQUEUED, - ) + job = self._acquire_job(env, job_uuid) + if not job: return "" - job = Job.load(env, job_uuid) - assert job and job.state == ENQUEUED - try: try: self._try_perform_job(env, job) diff --git a/queue_job/job.py b/queue_job/job.py index f5de1233f2..750fda0a80 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -222,7 +222,7 @@ def load_many(cls, env, job_uuids): recordset = cls.db_records_from_uuids(env, job_uuids) return {cls._load_from_db_record(record) for record in recordset} - def add_lock_record(self): + def add_lock_record(self) -> None: """ Create row in db to be locked while the job is being performed. """ @@ -242,13 +242,11 @@ def add_lock_record(self): [self.uuid], ) - def lock(self): - """ - Lock row of job that is being performed + def lock(self) -> bool: + """Lock row of job that is being performed. - If a job cannot be locked, - it means that the job wasn't started, - a RetryableJobError is thrown. + Return False if a job cannot be locked: it means that the job is not in + STARTED state or is already locked by another worker. """ self.env.cr.execute( """ @@ -266,16 +264,13 @@ def lock(self): uuid = %s AND state = %s ) - FOR UPDATE; + FOR UPDATE SKIP LOCKED; """, [self.uuid, STARTED], ) # 1 job should be locked - if 1 != len(self.env.cr.fetchall()): - raise RetryableJobError( - f"Trying to lock job that wasn't started, uuid: {self.uuid}" - ) + return bool(self.env.cr.fetchall()) @classmethod def _load_from_db_record(cls, job_db_record): diff --git a/test_queue_job/tests/__init__.py b/test_queue_job/tests/__init__.py index 62347148e5..0cfacebdf3 100644 --- a/test_queue_job/tests/__init__.py +++ b/test_queue_job/tests/__init__.py @@ -1,3 +1,4 @@ +from . import test_acquire_job from . import test_autovacuum from . import test_delayable from . import test_dependencies diff --git a/test_queue_job/tests/common.py b/test_queue_job/tests/common.py index 335c072625..d3173a2198 100644 --- a/test_queue_job/tests/common.py +++ b/test_queue_job/tests/common.py @@ -20,3 +20,13 @@ def _create_job(self): stored = Job.db_records_from_uuids(self.env, [test_job.uuid]) self.assertEqual(len(stored), 1) return stored + + def _get_demo_job(self, uuid): + # job created during load of demo data + job = self.env["queue.job"].search([("uuid", "=", uuid)], limit=1) + self.assertTrue( + job, + f"Demo data queue job {uuid!r} should be loaded in order " + "to make this test work", + ) + return job diff --git a/test_queue_job/tests/test_acquire_job.py b/test_queue_job/tests/test_acquire_job.py new file mode 100644 index 0000000000..3f0c92a2be --- /dev/null +++ b/test_queue_job/tests/test_acquire_job.py @@ -0,0 +1,51 @@ +# Copyright 2026 ACSONE SA/NV +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). +import logging +from unittest import mock + +from odoo.tests import tagged + +from odoo.addons.queue_job.controllers.main import RunJobController + +from .common import JobCommonCase + + +@tagged("post_install", "-at_install") +class TestRequeueDeadJob(JobCommonCase): + def test_acquire_enqueued_job(self): + job_record = self._get_demo_job(uuid="test_enqueued_job") + self.assertFalse( + self.env["queue.job.lock"].search( + [("queue_job_id", "=", job_record.id)], + ), + "A job lock record should not exist at this point", + ) + with mock.patch.object( + self.env.cr, "commit", mock.Mock(side_effect=self.env.flush_all) + ) as mock_commit: + job = RunJobController._acquire_job(self.env, job_uuid="test_enqueued_job") + mock_commit.assert_called_once() + self.assertIsNotNone(job) + self.assertEqual(job.uuid, "test_enqueued_job") + self.assertEqual(job.state, "started") + self.assertTrue( + self.env["queue.job.lock"].search( + [("queue_job_id", "=", job_record.id)] + ), + "A job lock record should exist at this point", + ) + + def test_acquire_started_job(self): + with ( + mock.patch.object( + self.env.cr, "commit", mock.Mock(side_effect=self.env.flush_all) + ) as mock_commit, + self.assertLogs(level=logging.WARNING) as logs, + ): + job = RunJobController._acquire_job(self.env, "test_started_job") + mock_commit.assert_not_called() + self.assertIsNone(job) + self.assertIn( + "was requested to run job test_started_job, but it does not exist", + logs.output[0], + ) diff --git a/test_queue_job/tests/test_requeue_dead_job.py b/test_queue_job/tests/test_requeue_dead_job.py index 58890adf24..510276be63 100644 --- a/test_queue_job/tests/test_requeue_dead_job.py +++ b/test_queue_job/tests/test_requeue_dead_job.py @@ -13,23 +13,6 @@ @tagged("post_install", "-at_install") class TestRequeueDeadJob(JobCommonCase): - def _get_demo_job(self, uuid): - # job created during load of demo data - job = self.env["queue.job"].search( - [ - ("uuid", "=", uuid), - ], - limit=1, - ) - - self.assertTrue( - job, - f"Demo data queue job {uuid} should be loaded in order" - " to make this tests work", - ) - - return job - def get_locks(self, uuid, cr=None): """ Retrieve lock rows From 26466d3d1591be3c95c59cc0216f8ef86ea4f41c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Bidoul?= Date: Sat, 3 Jan 2026 11:26:02 +0100 Subject: [PATCH 10/13] [IMP] queue_job: refactor runjob Extract the logic to run one job out of the /queue_job/runjob route. Towards making this logic reusable in other job executors. --- queue_job/controllers/main.py | 35 +++++++++++++++++------------------ 1 file changed, 17 insertions(+), 18 deletions(-) diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py index b8012d14c1..76a88e1ecd 100644 --- a/queue_job/controllers/main.py +++ b/queue_job/controllers/main.py @@ -108,17 +108,7 @@ def _enqueue_dependent_jobs(self, env, job): else: break - @http.route( - "/queue_job/runjob", - type="http", - auth="none", - save_session=False, - readonly=False, - ) - def runjob(self, db, job_uuid, **kw): - http.request.session.db = db - env = http.request.env(user=SUPERUSER_ID) - + def _runjob(self, env: api.Environment, job: Job) -> None: def retry_postpone(job, message, seconds=None): job.env.clear() with Registry(job.env.cr.dbname).cursor() as new_cr: @@ -127,10 +117,6 @@ def retry_postpone(job, message, seconds=None): job.set_pending(reset_retry=False) job.store() - job = self._acquire_job(env, job_uuid) - if not job: - return "" - try: try: self._try_perform_job(env, job) @@ -151,7 +137,6 @@ def retry_postpone(job, message, seconds=None): # traceback in the logs we should have the traceback when all # retries are exhausted env.cr.rollback() - return "" except (FailedJobError, Exception) as orig_exception: buff = StringIO() @@ -171,8 +156,6 @@ def retry_postpone(job, message, seconds=None): self._enqueue_dependent_jobs(env, job) _logger.debug("%s enqueue depends done", job) - return "" - def _get_failure_values(self, job, traceback_txt, orig_exception): """Collect relevant data from exception.""" exception_name = orig_exception.__class__.__name__ @@ -187,6 +170,22 @@ def _get_failure_values(self, job, traceback_txt, orig_exception): "exc_message": exc_message, } + @http.route( + "/queue_job/runjob", + type="http", + auth="none", + save_session=False, + readonly=False, + ) + def runjob(self, db, job_uuid, **kw): + http.request.session.db = db + env = http.request.env(user=SUPERUSER_ID) + job = self._acquire_job(env, job_uuid) + if not job: + return "" + self._runjob(env, job) + return "" + # flake8: noqa: C901 @http.route("/queue_job/create_test_job", type="http", auth="user") def create_test_job( From 4bf3d6148265c6b5917521aa3837ea9006a671c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Bidoul?= Date: Sat, 3 Jan 2026 11:39:13 +0100 Subject: [PATCH 11/13] [IMP] queue_job: convert job execution logic to class method Towards making this logic reusable. --- queue_job/controllers/main.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py index 76a88e1ecd..7faf46d03b 100644 --- a/queue_job/controllers/main.py +++ b/queue_job/controllers/main.py @@ -65,7 +65,8 @@ def _acquire_job(cls, env: api.Environment, job_uuid: str) -> Job | None: return None return job - def _try_perform_job(self, env, job): + @classmethod + def _try_perform_job(cls, env, job): """Try to perform the job, mark it done and commit if successful.""" _logger.debug("%s started", job) job.perform() @@ -78,7 +79,8 @@ def _try_perform_job(self, env, job): env.cr.commit() _logger.debug("%s done", job) - def _enqueue_dependent_jobs(self, env, job): + @classmethod + def _enqueue_dependent_jobs(cls, env, job): tries = 0 while True: try: @@ -108,7 +110,8 @@ def _enqueue_dependent_jobs(self, env, job): else: break - def _runjob(self, env: api.Environment, job: Job) -> None: + @classmethod + def _runjob(cls, env: api.Environment, job: Job) -> None: def retry_postpone(job, message, seconds=None): job.env.clear() with Registry(job.env.cr.dbname).cursor() as new_cr: @@ -119,7 +122,7 @@ def retry_postpone(job, message, seconds=None): try: try: - self._try_perform_job(env, job) + cls._try_perform_job(env, job) except OperationalError as err: # Automatically retry the typical transaction serialization # errors @@ -146,17 +149,18 @@ def retry_postpone(job, message, seconds=None): job.env.clear() with Registry(job.env.cr.dbname).cursor() as new_cr: job.env = job.env(cr=new_cr) - vals = self._get_failure_values(job, traceback_txt, orig_exception) + vals = cls._get_failure_values(job, traceback_txt, orig_exception) job.set_failed(**vals) job.store() buff.close() raise _logger.debug("%s enqueue depends started", job) - self._enqueue_dependent_jobs(env, job) + cls._enqueue_dependent_jobs(env, job) _logger.debug("%s enqueue depends done", job) - def _get_failure_values(self, job, traceback_txt, orig_exception): + @classmethod + def _get_failure_values(cls, job, traceback_txt, orig_exception): """Collect relevant data from exception.""" exception_name = orig_exception.__class__.__name__ if hasattr(orig_exception, "__module__"): From d2dc621c68694592cade110e12b49b21f33b0aab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Bidoul?= Date: Fri, 2 Jan 2026 15:12:40 +0100 Subject: [PATCH 12/13] queue_job: declare sbidoul as maintainer --- queue_job/__manifest__.py | 2 +- test_queue_job/__manifest__.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/queue_job/__manifest__.py b/queue_job/__manifest__.py index 01e6a89015..2d74ea3fc1 100644 --- a/queue_job/__manifest__.py +++ b/queue_job/__manifest__.py @@ -29,7 +29,7 @@ }, "installable": True, "development_status": "Mature", - "maintainers": ["guewen"], + "maintainers": ["guewen", "sbidoul"], "post_init_hook": "post_init_hook", "post_load": "post_load", } diff --git a/test_queue_job/__manifest__.py b/test_queue_job/__manifest__.py index 3cf7243aa7..98b8a0d485 100644 --- a/test_queue_job/__manifest__.py +++ b/test_queue_job/__manifest__.py @@ -15,5 +15,6 @@ "security/ir.model.access.csv", "data/queue_job_test_job.xml", ], + "maintainers": ["sbidoul"], "installable": True, } From 16bb31b1bd5e93b6267a4542aa2c4090280f3b45 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Bidoul?= Date: Sun, 4 Jan 2026 13:38:48 +0100 Subject: [PATCH 13/13] [IMP] queue_job: take weaker locks Since we are not going to delete records nor modify foreign keys, we can take a weaker lock. --- queue_job/controllers/main.py | 2 +- queue_job/job.py | 2 +- queue_job/jobrunner/runner.py | 2 +- test_queue_job/tests/test_requeue_dead_job.py | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py index 7faf46d03b..4a039d8eba 100644 --- a/queue_job/controllers/main.py +++ b/queue_job/controllers/main.py @@ -41,7 +41,7 @@ def _acquire_job(cls, env: api.Environment, job_uuid: str) -> Job | None: """ env.cr.execute( "SELECT uuid FROM queue_job WHERE uuid=%s AND state=%s " - "FOR UPDATE SKIP LOCKED", + "FOR NO KEY UPDATE SKIP LOCKED", (job_uuid, ENQUEUED), ) if not env.cr.fetchone(): diff --git a/queue_job/job.py b/queue_job/job.py index 750fda0a80..3eca2d2661 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -264,7 +264,7 @@ def lock(self) -> bool: uuid = %s AND state = %s ) - FOR UPDATE SKIP LOCKED; + FOR NO KEY UPDATE SKIP LOCKED; """, [self.uuid, STARTED], ) diff --git a/queue_job/jobrunner/runner.py b/queue_job/jobrunner/runner.py index dfcd9db040..681d03fadf 100644 --- a/queue_job/jobrunner/runner.py +++ b/queue_job/jobrunner/runner.py @@ -367,7 +367,7 @@ def _query_requeue_dead_jobs(self): queue_job_lock WHERE queue_job_lock.queue_job_id = queue_job.id - FOR UPDATE SKIP LOCKED + FOR NO KEY UPDATE SKIP LOCKED ) OR NOT EXISTS ( SELECT diff --git a/test_queue_job/tests/test_requeue_dead_job.py b/test_queue_job/tests/test_requeue_dead_job.py index 510276be63..a267c43c87 100644 --- a/test_queue_job/tests/test_requeue_dead_job.py +++ b/test_queue_job/tests/test_requeue_dead_job.py @@ -35,7 +35,7 @@ def get_locks(self, uuid, cr=None): WHERE uuid = %s ) - FOR UPDATE SKIP LOCKED + FOR NO KEY UPDATE SKIP LOCKED """, [uuid], )