diff --git a/base_export_async/tests/test_base_export_async.py b/base_export_async/tests/test_base_export_async.py index c6909ebbd0..be6a26c61b 100644 --- a/base_export_async/tests/test_base_export_async.py +++ b/base_export_async/tests/test_base_export_async.py @@ -42,7 +42,7 @@ class TestBaseExportAsync(common.TransactionCase): def setUp(self): - super(TestBaseExportAsync, self).setUp() + super().setUp() self.delay_export_obj = self.env["delay.export"] self.job_obj = self.env["queue.job"] _request_stack.push( diff --git a/base_import_async/data/queue_job_function_data.xml b/base_import_async/data/queue_job_function_data.xml index 22cc8dbab0..fb04a63613 100644 --- a/base_import_async/data/queue_job_function_data.xml +++ b/base_import_async/data/queue_job_function_data.xml @@ -1,7 +1,13 @@ + + base_import + + + _split_file + _import_one_chunk + Delaying jobs

Note: delay() must be called on the delayable, chain, or group which is at the top of the graph. In the example above, if it was called on group_a, then group_b would never be delayed (but a warning would be shown).

+

It is also possible to split a job into several jobs, each one processing a part of the +work. This can be useful to avoid very long jobs, parallelize some task and get more specific +errors. Usage is as follows:

+
+def button_split_delayable(self):
+    (
+        self  # Can be a big recordset, let's say 1000 records
+        .delayable()
+        .generate_thumbnail((50, 50))
+        .set(priority=30)
+        .set(description=_("generate xxx"))
+        .split(50)  # Split the job in 20 jobs of 50 records each
+        .delay()
+    )
+
+

The split() method takes a chain boolean keyword argument. If set to +True, the jobs will be chained, meaning that the next job will only start when the previous +one is done:

+
+def button_increment_var(self):
+    (
+        self
+        .delayable()
+        .increment_counter()
+        .split(1, chain=True) # Will exceute the jobs one after the other
+        .delay()
+    )
+

Enqueing Job Options

@@ -734,11 +762,11 @@

Configure default options for job

Bypass jobs on running Odoo

When you are developing (ie: connector modules) you might want to bypass the queue job and run your code immediately.

-

To do so you can set TEST_QUEUE_JOB_NO_DELAY=1 in your enviroment.

+

To do so you can set QUEUE_JOB__NO_DELAY=1 in your environment.

Bypass jobs in tests

When writing tests on job-related methods is always tricky to deal with delayed recordsets. To make your testing life easier -you can set test_queue_job_no_delay=True in the context.

+you can set queue_job__no_delay=True in the context.

Tip: you can do this at test case level like this

 @classmethod
@@ -746,7 +774,7 @@ 

Configure default options for job super().setUpClass() cls.env = cls.env(context=dict( cls.env.context, - test_queue_job_no_delay=True, # no jobs thanks + queue_job__no_delay=True, # no jobs thanks ))

Then all your tests execute the job methods synchronously @@ -842,7 +870,7 @@

Testing

Execute jobs synchronously when running Odoo

When you are developing (ie: connector modules) you might want to bypass the queue job and run your code immediately.

-

To do so you can set TEST_QUEUE_JOB_NO_DELAY=1 in your environment.

+

To do so you can set QUEUE_JOB__NO_DELAY=1 in your environment.

Warning

Do not do this in production

@@ -850,7 +878,7 @@

Testing

Execute jobs synchronously in tests

You should use trap_jobs, really, but if for any reason you could not use it, and still need to have job methods executed synchronously in your tests, you can -do so by setting test_queue_job_no_delay=True in the context.

+do so by setting queue_job__no_delay=True in the context.

Tip: you can do this at test case level like this

 @classmethod
@@ -858,7 +886,7 @@ 

Testing

super().setUpClass() cls.env = cls.env(context=dict( cls.env.context, - test_queue_job_no_delay=True, # no jobs thanks + queue_job__no_delay=True, # no jobs thanks ))

Then all your tests execute the job methods synchronously without delaying any @@ -868,7 +896,7 @@

Testing

@mute_logger(‘odoo.addons.queue_job.models.base’)

Note

-

in graphs of jobs, the test_queue_job_no_delay context key must be in at +

in graphs of jobs, the queue_job__no_delay context key must be in at least one job’s env of the graph for the whole graph to be executed synchronously

diff --git a/queue_job/static/src/js/queue_job_fields.js b/queue_job/static/src/js/queue_job_fields.js index 3829ad7145..7da8a6cfd6 100644 --- a/queue_job/static/src/js/queue_job_fields.js +++ b/queue_job/static/src/js/queue_job_fields.js @@ -17,6 +17,7 @@ odoo.define("queue_job.fields", function (require) { init: function () { this._super.apply(this, arguments); this.network = null; + this.forceRender = false; this.tabListenerInstalled = false; }, start: function () { @@ -89,6 +90,25 @@ odoo.define("queue_job.fields", function (require) { }); }); + if (nodes.length * edges.length > 5000 && !this.forceRender) { + const warningDiv = document.createElement("div"); + warningDiv.className = "alert alert-warning"; + warningDiv.innerText = + `This graph is big (${nodes.length} nodes, ` + + `${edges.length} edges), it may take a while to display.`; + const button = document.createElement("button"); + button.innerText = "Display anyway"; + button.className = "btn btn-secondary"; + button.onclick = function () { + self.forceRender = true; + warningDiv.parentNode.removeChild(warningDiv); + self._render(); + }; + warningDiv.appendChild(button); + this.$el.append(warningDiv); + return; + } + var data = { nodes: new vis.DataSet(nodes), edges: new vis.DataSet(edges), diff --git a/queue_job/static/src/scss/queue_job_fields.scss b/queue_job/static/src/scss/queue_job_fields.scss index 150469a384..64cb6465cf 100644 --- a/queue_job/static/src/scss/queue_job_fields.scss +++ b/queue_job/static/src/scss/queue_job_fields.scss @@ -2,4 +2,11 @@ width: 600px; height: 400px; border: 1px solid lightgray; + .alert { + height: 100%; + display: flex; + flex-direction: column; + align-items: center; + justify-content: center; + } } diff --git a/queue_job/tests/__init__.py b/queue_job/tests/__init__.py index e0ff9576a5..db53ac3a60 100644 --- a/queue_job/tests/__init__.py +++ b/queue_job/tests/__init__.py @@ -1,6 +1,7 @@ from . import test_runner_channels from . import test_runner_runner from . import test_delayable +from . import test_delayable_split from . import test_json_field from . import test_model_job_channel from . import test_model_job_function diff --git a/queue_job/tests/common.py b/queue_job/tests/common.py index 6bbc5be9e4..7afb1a2c9f 100644 --- a/queue_job/tests/common.py +++ b/queue_job/tests/common.py @@ -427,7 +427,7 @@ def __init__( def setUp(self): """Log an extra statement which test is started.""" - super(OdooDocTestCase, self).setUp() + super().setUp() logging.getLogger(__name__).info("Running tests for %s", self._dt_test.name) diff --git a/queue_job/tests/test_delayable.py b/queue_job/tests/test_delayable.py index 097c29f25e..6284ce80ab 100644 --- a/queue_job/tests/test_delayable.py +++ b/queue_job/tests/test_delayable.py @@ -1,13 +1,15 @@ # copyright 2019 Camptocamp # license agpl-3.0 or later (http://www.gnu.org/licenses/agpl.html) -import unittest from unittest import mock +from odoo.tests import common + +# pylint: disable=odoo-addons-relative-import from odoo.addons.queue_job.delay import Delayable, DelayableGraph -class TestDelayable(unittest.TestCase): +class TestDelayable(common.BaseCase): def setUp(self): super().setUp() self.recordset = mock.MagicMock(name="recordset") diff --git a/queue_job/tests/test_delayable_split.py b/queue_job/tests/test_delayable_split.py new file mode 100644 index 0000000000..b761878b2e --- /dev/null +++ b/queue_job/tests/test_delayable_split.py @@ -0,0 +1,94 @@ +# Copyright 2024 Akretion (http://www.akretion.com). +# @author Florian Mounier +# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl). + +from odoo.tests import common + +# pylint: disable=odoo-addons-relative-import +from odoo.addons.queue_job.delay import Delayable + + +class TestDelayableSplit(common.BaseCase): + def setUp(self): + super().setUp() + + class FakeRecordSet(list): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._name = "recordset" + + def __getitem__(self, key): + if isinstance(key, slice): + return FakeRecordSet(super().__getitem__(key)) + return super().__getitem__(key) + + def method(self, arg, kwarg=None): + """Method to be called""" + return arg, kwarg + + self.FakeRecordSet = FakeRecordSet + + def test_delayable_split_no_method_call_beforehand(self): + dl = Delayable(self.FakeRecordSet(range(20))) + with self.assertRaises(ValueError): + dl.split(3) + + def test_delayable_split_10_3(self): + dl = Delayable(self.FakeRecordSet(range(10))) + dl.method("arg", kwarg="kwarg") + group = dl.split(3) + self.assertEqual(len(group._delayables), 4) + delayables = sorted(list(group._delayables), key=lambda x: x.description) + self.assertEqual(delayables[0].recordset, self.FakeRecordSet([0, 1, 2])) + self.assertEqual(delayables[1].recordset, self.FakeRecordSet([3, 4, 5])) + self.assertEqual(delayables[2].recordset, self.FakeRecordSet([6, 7, 8])) + self.assertEqual(delayables[3].recordset, self.FakeRecordSet([9])) + self.assertEqual(delayables[0].description, "Method to be called (split 1/4)") + self.assertEqual(delayables[1].description, "Method to be called (split 2/4)") + self.assertEqual(delayables[2].description, "Method to be called (split 3/4)") + self.assertEqual(delayables[3].description, "Method to be called (split 4/4)") + self.assertNotEqual(delayables[0]._job_method, dl._job_method) + self.assertNotEqual(delayables[1]._job_method, dl._job_method) + self.assertNotEqual(delayables[2]._job_method, dl._job_method) + self.assertNotEqual(delayables[3]._job_method, dl._job_method) + self.assertEqual(delayables[0]._job_method.__name__, dl._job_method.__name__) + self.assertEqual(delayables[1]._job_method.__name__, dl._job_method.__name__) + self.assertEqual(delayables[2]._job_method.__name__, dl._job_method.__name__) + self.assertEqual(delayables[3]._job_method.__name__, dl._job_method.__name__) + self.assertEqual(delayables[0]._job_args, ("arg",)) + self.assertEqual(delayables[1]._job_args, ("arg",)) + self.assertEqual(delayables[2]._job_args, ("arg",)) + self.assertEqual(delayables[3]._job_args, ("arg",)) + self.assertEqual(delayables[0]._job_kwargs, {"kwarg": "kwarg"}) + self.assertEqual(delayables[1]._job_kwargs, {"kwarg": "kwarg"}) + self.assertEqual(delayables[2]._job_kwargs, {"kwarg": "kwarg"}) + self.assertEqual(delayables[3]._job_kwargs, {"kwarg": "kwarg"}) + + def test_delayable_split_10_5(self): + dl = Delayable(self.FakeRecordSet(range(10))) + dl.method("arg", kwarg="kwarg") + group = dl.split(5) + self.assertEqual(len(group._delayables), 2) + delayables = sorted(list(group._delayables), key=lambda x: x.description) + self.assertEqual(delayables[0].recordset, self.FakeRecordSet([0, 1, 2, 3, 4])) + self.assertEqual(delayables[1].recordset, self.FakeRecordSet([5, 6, 7, 8, 9])) + self.assertEqual(delayables[0].description, "Method to be called (split 1/2)") + self.assertEqual(delayables[1].description, "Method to be called (split 2/2)") + + def test_delayable_split_10_10(self): + dl = Delayable(self.FakeRecordSet(range(10))) + dl.method("arg", kwarg="kwarg") + group = dl.split(10) + self.assertEqual(len(group._delayables), 1) + delayables = sorted(list(group._delayables), key=lambda x: x.description) + self.assertEqual(delayables[0].recordset, self.FakeRecordSet(range(10))) + self.assertEqual(delayables[0].description, "Method to be called (split 1/1)") + + def test_delayable_split_10_20(self): + dl = Delayable(self.FakeRecordSet(range(10))) + dl.method("arg", kwarg="kwarg") + group = dl.split(20) + self.assertEqual(len(group._delayables), 1) + delayables = sorted(list(group._delayables), key=lambda x: x.description) + self.assertEqual(delayables[0].recordset, self.FakeRecordSet(range(10))) + self.assertEqual(delayables[0].description, "Method to be called (split 1/1)") diff --git a/queue_job/utils.py b/queue_job/utils.py new file mode 100644 index 0000000000..5134cd1068 --- /dev/null +++ b/queue_job/utils.py @@ -0,0 +1,40 @@ +# Copyright 2023 Camptocamp +# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html) + +import logging +import os + +_logger = logging.getLogger(__name__) + + +def must_run_without_delay(env): + """Retrun true if jobs have to run immediately. + + :param env: `odoo.api.Environment` instance + """ + # TODO: drop in v17 + if os.getenv("TEST_QUEUE_JOB_NO_DELAY"): + _logger.warning( + "`TEST_QUEUE_JOB_NO_DELAY` env var found. NO JOB scheduled. " + "Note that this key is deprecated: please use `QUEUE_JOB__NO_DELAY`" + ) + return True + + if os.getenv("QUEUE_JOB__NO_DELAY"): + _logger.warning("`QUEUE_JOB__NO_DELAY` env var found. NO JOB scheduled.") + return True + + # TODO: drop in v17 + deprecated_keys = ("_job_force_sync", "test_queue_job_no_delay") + for key in deprecated_keys: + if env.context.get(key): + _logger.warning( + "`%s` ctx key found. NO JOB scheduled. " + "Note that this key is deprecated: please use `queue_job__no_delay`", + key, + ) + return True + + if env.context.get("queue_job__no_delay"): + _logger.warning("`queue_job__no_delay` ctx key found. NO JOB scheduled.") + return True diff --git a/queue_job/views/queue_job_views.xml b/queue_job/views/queue_job_views.xml index 51336f5197..c6e2481b71 100644 --- a/queue_job/views/queue_job_views.xml +++ b/queue_job/views/queue_job_views.xml @@ -249,6 +249,22 @@ string="Cancelled" domain="[('state', '=', 'cancelled')]" /> + + + + + diff --git a/queue_job_cron/models/ir_cron.py b/queue_job_cron/models/ir_cron.py index 440740f164..bb09ed075e 100644 --- a/queue_job_cron/models/ir_cron.py +++ b/queue_job_cron/models/ir_cron.py @@ -4,12 +4,23 @@ from odoo import api, fields, models +from odoo.addons.queue_job.job import identity_exact + _logger = logging.getLogger(__name__) class IrCron(models.Model): _inherit = "ir.cron" + no_parallel_queue_job_run = fields.Boolean( + help="Avoid parallel run. " + "If the cron job is already running, the new one will be skipped. " + "By default, odoo never runs the same cron job in parallel. This " + "option is therefore set to True by default when job is run as a " + "queue job.", + default=True, + ) + run_as_queue_job = fields.Boolean( help="Specify if this cron should be ran as a queue job" ) @@ -17,13 +28,16 @@ class IrCron(models.Model): comodel_name="queue.job.channel", compute="_compute_run_as_queue_job", readonly=False, + store=True, string="Channel", ) @api.depends("run_as_queue_job") def _compute_run_as_queue_job(self): for cron in self: - if cron.run_as_queue_job and not cron.channel_id: + if cron.channel_id: + continue + if cron.run_as_queue_job: cron.channel_id = self.env.ref("queue_job_cron.channel_root_ir_cron").id else: cron.channel_id = False @@ -39,23 +53,29 @@ def method_direct_trigger(self): _cron = cron.with_user(cron.user_id).with_context( lastcall=cron.lastcall ) - _cron.with_delay( - priority=_cron.priority, - description=_cron.name, - channel=_cron.channel_id.complete_name, - )._run_job_as_queue_job(server_action=_cron.ir_actions_server_id) + _cron._delay_run_job_as_queue_job( + server_action=_cron.ir_actions_server_id + ) return True def _callback(self, cron_name, server_action_id, job_id): cron = self.env["ir.cron"].sudo().browse(job_id) if cron.run_as_queue_job: server_action = self.env["ir.actions.server"].browse(server_action_id) - return self.with_delay( - priority=cron.priority, - description=cron.name, - channel=cron.channel_id.complete_name, - )._run_job_as_queue_job(server_action=server_action) + return cron._delay_run_job_as_queue_job(server_action=server_action) else: return super()._callback( cron_name=cron_name, server_action_id=server_action_id, job_id=job_id ) + + def _delay_run_job_as_queue_job(self, server_action): + self.ensure_one() + identity_key = None + if self.no_parallel_queue_job_run: + identity_key = identity_exact + return self.with_delay( + priority=self.priority, + description=self.name, + channel=self.channel_id.complete_name, + identity_key=identity_key, + )._run_job_as_queue_job(server_action=server_action) diff --git a/queue_job_cron/readme/newsfragments/.gitignore b/queue_job_cron/readme/newsfragments/.gitignore new file mode 100644 index 0000000000..e69de29bb2 diff --git a/queue_job_cron/readme/newsfragments/612.feature b/queue_job_cron/readme/newsfragments/612.feature new file mode 100644 index 0000000000..9c521620a2 --- /dev/null +++ b/queue_job_cron/readme/newsfragments/612.feature @@ -0,0 +1,9 @@ +By default prevent parallel run of the same cron job when run as queue job. + +When a cron job is run by odoo, the odoo runner will prevent parallel run +of the same cron job. Before this change, this was not the case when the +cron job was run as a queue job. A new option is added to the cron job when +run as a queue job to prevent parallel run. This option is set to True by +default. In this way, the behavior is now the same as when the cron job is run +by odoo but you keep the possibility to disable this restriction when run as +a queue job. diff --git a/queue_job_cron/tests/test_queue_job_cron.py b/queue_job_cron/tests/test_queue_job_cron.py index 3eec55f7e9..d3cc18d636 100644 --- a/queue_job_cron/tests/test_queue_job_cron.py +++ b/queue_job_cron/tests/test_queue_job_cron.py @@ -39,3 +39,22 @@ def test_queue_job_cron_run(self): cron = self.env.ref("queue_job.ir_cron_autovacuum_queue_jobs") IrCron = self.env["ir.cron"] IrCron._run_job_as_queue_job(server_action=cron.ir_actions_server_id) + + def test_queue_job_no_parallelism(self): + cron = self.env.ref("queue_job.ir_cron_autovacuum_queue_jobs") + default_channel = self.env.ref("queue_job_cron.channel_root_ir_cron") + cron.write( + { + "no_parallel_queue_job_run": True, + "run_as_queue_job": True, + "channel_id": default_channel.id, + } + ) + cron.method_direct_trigger() + cron.method_direct_trigger() + nb_jobs = self.env["queue.job"].search_count([("name", "=", cron.name)]) + self.assertEqual(nb_jobs, 1) + cron.no_parallel_queue_job_run = False + cron.method_direct_trigger() + nb_jobs = self.env["queue.job"].search_count([("name", "=", cron.name)]) + self.assertEqual(nb_jobs, 2) diff --git a/queue_job_cron/views/ir_cron_view.xml b/queue_job_cron/views/ir_cron_view.xml index 1c2c271fb1..2b3567bfcf 100644 --- a/queue_job_cron/views/ir_cron_view.xml +++ b/queue_job_cron/views/ir_cron_view.xml @@ -7,6 +7,10 @@ +