diff --git a/queue_job_cron_jobrunner/models/queue_job.py b/queue_job_cron_jobrunner/models/queue_job.py index f6a4c5f36f..86e9f2d04e 100644 --- a/queue_job_cron_jobrunner/models/queue_job.py +++ b/queue_job_cron_jobrunner/models/queue_job.py @@ -4,9 +4,11 @@ import logging import traceback +from collections import defaultdict from datetime import datetime from io import StringIO +import psutil from psycopg2 import OperationalError from odoo import _, api, fields, models, tools @@ -19,6 +21,7 @@ RetryableJobError, ) from odoo.addons.queue_job.job import Job +from odoo.addons.queue_job.jobrunner import QueueJobRunner _logger = logging.getLogger(__name__) @@ -27,14 +30,12 @@ class QueueJob(models.Model): _inherit = "queue.job" @api.model - def _acquire_one_job(self): + def _acquire_one_job(self, commit=False): """Acquire the next job to be run. :returns: queue.job record (locked for update) """ - # TODO: This method should respect channel priority and capacity, - # rather than just fetching them by creation date. - self.flush() + runner = QueueJobRunner.from_environ_or_config() self.env.cr.execute( """ SELECT id @@ -42,32 +43,73 @@ def _acquire_one_job(self): WHERE state = 'pending' AND (eta IS NULL OR eta <= (now() AT TIME ZONE 'UTC')) ORDER BY priority, date_created - LIMIT 1 FOR NO KEY UPDATE SKIP LOCKED + FOR NO KEY UPDATE """ ) - row = self.env.cr.fetchone() - return self.browse(row and row[0]) + rows = self.env.cr.fetchall() + + channels = defaultdict(int) + for queue_job in self.search([("state", "=", "started")]): + if not queue_job.channel: + continue + channels[queue_job.channel] += 1 + channels_without_capacity = set() + for channel_str, running in channels.items(): + channel = runner.channel_manager.get_channel_by_name( + channel_str, autocreate=True + ) + if channel.capacity and channel.capacity <= running: + channels_without_capacity.add(channel_str) + channels_without_capacity.discard( + "root" + ) # root must be disabled to avoid normal jobrunner + _logger.info( + "_acquire_one_job channels_without_capacity %s", + channels_without_capacity, + ) + + result = self.browse() + for row in rows: + queue_job = self.browse(row[0]) + if queue_job.channel and queue_job.channel in channels_without_capacity: + continue + job = Job._load_from_db_record(queue_job) + job.set_started() + job.store() + _logger.info( + "_acquire_one_job queue.job %s[channel=%s,uuid=%s] started", + row[0], + job.channel, + job.uuid, + ) + result = queue_job + break + self.flush() + if commit: # pragma: no cover + self.env.cr.commit() # pylint: disable=invalid-commit + return result def _process(self, commit=False): """Process the job""" self.ensure_one() job = Job._load_from_db_record(self) - # Set it as started - job.set_started() - job.store() - _logger.debug("%s started", job.uuid) - # TODO: Commit the state change so that the state can be read from the UI - # while the job is processing. However, doing this will release the - # lock on the db, so we need to find another way. - # if commit: - # self.flush() - # self.env.cr.commit() - # Actual processing try: try: with self.env.cr.savepoint(): + _logger.info( + "perform %s[channel=%s,uuid=%s]", + self.id, + self.channel, + self.uuid, + ) job.perform() + _logger.info( + "performed %s[channel=%s,uuid=%s]", + self.id, + self.channel, + self.uuid, + ) job.set_done() job.store() except OperationalError as err: @@ -87,13 +129,18 @@ def _process(self, commit=False): msg = _("Job interrupted and set to Done: nothing to do.") job.set_done(msg) job.store() + _logger.info( + "interrupted %s[channel=%s,uuid=%s]", self.id, self.channel, self.uuid + ) except RetryableJobError as err: # delay the job later, requeue job.postpone(result=str(err), seconds=5) job.set_pending(reset_retry=False) job.store() - _logger.debug("%s postponed", job) + _logger.info( + "postponed %s[channel=%s,uuid=%s]", self.id, self.channel, self.uuid + ) except (FailedJobError, Exception): with StringIO() as buff: @@ -101,6 +148,9 @@ def _process(self, commit=False): _logger.error(buff.getvalue()) job.set_failed(exc_info=buff.getvalue()) job.store() + _logger.info( + "failed %s[channel=%s,uuid=%s]", self.id, self.channel, self.uuid + ) if commit: # pragma: no cover self.env["base"].flush() @@ -113,10 +163,11 @@ def _process(self, commit=False): @api.model def _job_runner(self, commit=True): """Short-lived job runner, triggered by async crons""" - job = self._acquire_one_job() + self._release_started_jobs(commit=commit) + job = self._acquire_one_job(commit=commit) while job: job._process(commit=commit) - job = self._acquire_one_job() + job = self._acquire_one_job(commit=commit) # TODO: If limit_time_real_cron is reached before all the jobs are done, # the worker will be killed abruptly. # Ideally, find a way to know if we're close to reaching this limit, @@ -166,6 +217,24 @@ def _ensure_cron_trigger(self): if delayed_etas: self._cron_trigger(at=list(delayed_etas)) + @api.model + def _release_started_jobs(self, commit=False): + pids = [x.pid for x in psutil.process_iter()] + for record in self.search( + [("state", "=", "started"), ("worker_pid", "not in", pids)] + ): + job = Job._load_from_db_record(record) + job.set_pending() + job.store() + _logger.info( + "release started job %s[channel=%s,uuid=%s]", + record.id, + record.channel, + record.uuid, + ) + if commit: # pragma: no cover + self.env.cr.commit() # pylint: disable=invalid-commit + @api.model_create_multi def create(self, vals_list): # When jobs are created, also create the cron trigger diff --git a/queue_job_cron_jobrunner/tests/test_queue_job.py b/queue_job_cron_jobrunner/tests/test_queue_job.py index 5797d62e80..0592fcce75 100644 --- a/queue_job_cron_jobrunner/tests/test_queue_job.py +++ b/queue_job_cron_jobrunner/tests/test_queue_job.py @@ -3,6 +3,7 @@ # License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). from datetime import datetime +from unittest import mock from freezegun import freeze_time @@ -10,6 +11,8 @@ from odoo.tests.common import TransactionCase from odoo.tools import mute_logger +from odoo.addons.queue_job.jobrunner import QueueJobRunner + class TestQueueJob(TransactionCase): def setUp(self): @@ -56,7 +59,6 @@ def test_queue_job_cron_trigger_enqueue_dependencies(self): # if the state is "waiting_dependencies", it means the "enqueue_waiting()" # step has not been done when the parent job has been done self.assertEqual(job_record_depends.state, "done", "Processed OK") - self.assertEqual(self.cron.nextcall, datetime(2022, 2, 22, 22, 22, 22)) @freeze_time("2022-02-22 22:22:22") def test_concurrent_cron_access(self): @@ -70,20 +72,7 @@ def test_concurrent_cron_access(self): (self.cron.id,), log_exceptions=False, ) - - delayable = self.env["res.partner"].delayable().create({"name": "test"}) - delayable2 = self.env["res.partner"].delayable().create({"name": "test2"}) - delayable.on_done(delayable2) - delayable.delay() - job_record = delayable._generated_job.db_record() - job_record_depends = delayable2._generated_job.db_record() - - self.env["queue.job"]._job_runner(commit=False) - - self.assertEqual(job_record.state, "done", "Processed OK") - # if the state is "waiting_dependencies", it means the "enqueue_waiting()" - # step has not been done when the parent job has been done - self.assertEqual(job_record_depends.state, "done", "Processed OK") + self.env["res.partner"].delayable().create({"name": "test"}) self.assertNotEqual(self.cron.nextcall, datetime(2022, 2, 22, 22, 22, 22)) def test_acquire_one_job_use_priority(self): @@ -98,7 +87,9 @@ def test_acquire_one_job_use_priority(self): with freeze_time("2024-01-01 10:03:01"): self.env["res.partner"].with_delay(priority=2).create({"name": "test"}) - self.assertEqual(self.env["queue.job"]._acquire_one_job(), job.db_record()) + self.assertEqual( + self.env["queue.job"]._acquire_one_job(commit=False), job.db_record() + ) def test_acquire_one_job_consume_the_oldest_first(self): with freeze_time("2024-01-01 10:01:01"): @@ -112,4 +103,91 @@ def test_acquire_one_job_consume_the_oldest_first(self): with freeze_time("2024-01-01 10:03:01"): self.env["res.partner"].with_delay(priority=30).create({"name": "test"}) - self.assertEqual(self.env["queue.job"]._acquire_one_job(), job.db_record()) + self.assertEqual( + self.env["queue.job"]._acquire_one_job(commit=False), job.db_record() + ) + + def test_acquire_one_job_starts_job(self): + job = self.env["res.partner"].with_delay(priority=1).create({"name": "test"}) + + result = self.env["queue.job"]._acquire_one_job(commit=False) + + self.assertEqual(result, job.db_record()) + self.assertEqual(job.db_record().state, "started") + + def test_acquire_one_job_do_not_overload_channel(self): + runner = QueueJobRunner.from_environ_or_config() + runner.channel_manager.get_channel_by_name( + "root.foobar", autocreate=True + ).capacity = 2 + job1 = ( + self.env["res.partner"] + .with_delay(channel="root.foobar") + .create({"name": "test1"}) + ) + job2 = ( + self.env["res.partner"] + .with_delay(channel="root.foobar") + .create({"name": "test2"}) + ) + self.env["res.partner"].with_delay(channel="root.foobar").create( + {"name": "test3"} + ) + + with mock.patch.object( + QueueJobRunner, "from_environ_or_config", return_value=runner + ): + first_acquired_job = self.env["queue.job"]._acquire_one_job(commit=False) + second_acquired_job = self.env["queue.job"]._acquire_one_job(commit=False) + third_acquired_job = self.env["queue.job"]._acquire_one_job(commit=False) + + self.assertEqual(first_acquired_job, job1.db_record()) + self.assertEqual(second_acquired_job, job2.db_record()) + self.assertEqual(third_acquired_job, self.env["queue.job"].browse()) + + def test_acquire_one_job_root_capacity_ignored(self): + runner = QueueJobRunner.from_environ_or_config() + runner.channel_manager.get_channel_by_name("root", autocreate=True).capacity = 0 + job1 = ( + self.env["res.partner"].with_delay(channel="root").create({"name": "test1"}) + ) + job2 = ( + self.env["res.partner"].with_delay(channel="root").create({"name": "test2"}) + ) + job3 = ( + self.env["res.partner"].with_delay(channel="root").create({"name": "test3"}) + ) + + with mock.patch.object( + QueueJobRunner, "from_environ_or_config", return_value=runner + ): + first_acquired_job = self.env["queue.job"]._acquire_one_job(commit=False) + second_acquired_job = self.env["queue.job"]._acquire_one_job(commit=False) + third_acquired_job = self.env["queue.job"]._acquire_one_job(commit=False) + + self.assertEqual(first_acquired_job, job1.db_record()) + self.assertEqual(second_acquired_job, job2.db_record()) + self.assertEqual(third_acquired_job, job3.db_record()) + + @freeze_time("2022-02-22 22:22:22") + def test_queue_job_creation_create_change_next_call(self): + self.cron.nextcall = datetime(2021, 1, 21, 21, 21, 21) + self.env["res.partner"].with_delay().create({"name": "test"}) + self.assertNotEqual(self.cron.nextcall, datetime(2022, 2, 22, 22, 22, 22)) + + def test_release_started_jobs(self): + job_known_pid = self.env["res.partner"].with_delay().create({"name": "test"}) + job_known_pid.set_started() + job_known_pid.store() + known_pid = job_known_pid.db_record().worker_pid + job_unknown_pid = self.env["res.partner"].with_delay().create({"name": "test"}) + job_unknown_pid.set_started() + job_unknown_pid.store() + job_unknown_pid.db_record().worker_pid = -1 + + self.env["queue.job"]._release_started_jobs(commit=False) + + self.assertEqual(job_unknown_pid.db_record().state, "pending") + self.assertEqual(job_unknown_pid.db_record().worker_pid, 0) + self.assertEqual(job_known_pid.db_record().state, "started") + self.assertEqual(job_known_pid.db_record().worker_pid, known_pid)