Skip to content

Commit d72fbca

Browse files
committed
[IMP] queue_job_cron_jobrunner: channel
1 parent 813ad66 commit d72fbca

File tree

2 files changed

+147
-38
lines changed

2 files changed

+147
-38
lines changed

queue_job_cron_jobrunner/models/queue_job.py

+69-21
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
RetryableJobError,
2020
)
2121
from odoo.addons.queue_job.job import Job
22+
from odoo.addons.queue_job.jobrunner import QueueJobRunner
2223

2324
_logger = logging.getLogger(__name__)
2425

@@ -27,47 +28,86 @@ class QueueJob(models.Model):
2728
_inherit = "queue.job"
2829

2930
@api.model
30-
def _acquire_one_job(self):
31+
def _acquire_one_job(self, commit=False):
3132
"""Acquire the next job to be run.
3233
3334
:returns: queue.job record (locked for update)
3435
"""
35-
# TODO: This method should respect channel priority and capacity,
36-
# rather than just fetching them by creation date.
37-
self.flush()
36+
runner = QueueJobRunner.from_environ_or_config()
3837
self.env.cr.execute(
3938
"""
4039
SELECT id
4140
FROM queue_job
4241
WHERE state = 'pending'
4342
AND (eta IS NULL OR eta <= (now() AT TIME ZONE 'UTC'))
4443
ORDER BY priority, date_created
45-
LIMIT 1 FOR NO KEY UPDATE SKIP LOCKED
44+
FOR NO KEY UPDATE
4645
"""
4746
)
48-
row = self.env.cr.fetchone()
49-
return self.browse(row and row[0])
47+
rows = self.env.cr.fetchall()
48+
49+
channels = {}
50+
for queue_job in self.search([("state", "=", "started")]):
51+
if not queue_job.channel:
52+
continue
53+
channels[queue_job.channel] = channels.get(queue_job.channel, 0) + 1
54+
channels_without_capacity = set()
55+
for channel_str, running in channels.items():
56+
channel = runner.channel_manager.get_channel_by_name(
57+
channel_str, autocreate=True
58+
)
59+
if channel.capacity and channel.capacity <= running:
60+
channels_without_capacity.add(channel_str)
61+
channels_without_capacity.discard(
62+
"root"
63+
) # root must be disabled to avoid normal jobrunner
64+
_logger.info(
65+
"_acquire_one_job channels_without_capacity %s",
66+
channels_without_capacity,
67+
)
68+
69+
result = self.browse()
70+
for row in rows:
71+
queue_job = self.browse(row[0])
72+
if queue_job.channel and queue_job.channel in channels_without_capacity:
73+
continue
74+
job = Job._load_from_db_record(queue_job)
75+
job.set_started()
76+
job.store()
77+
_logger.info(
78+
"_acquire_one_job queue.job %s[channel=%s,uuid=%s] started",
79+
row[0],
80+
job.channel,
81+
job.uuid,
82+
)
83+
result = queue_job
84+
break
85+
self.flush()
86+
if commit: # pragma: no cover
87+
self.env.cr.commit() # pylint: disable=invalid-commit
88+
return result
5089

5190
def _process(self, commit=False):
5291
"""Process the job"""
5392
self.ensure_one()
5493
job = Job._load_from_db_record(self)
55-
# Set it as started
56-
job.set_started()
57-
job.store()
58-
_logger.debug("%s started", job.uuid)
59-
# TODO: Commit the state change so that the state can be read from the UI
60-
# while the job is processing. However, doing this will release the
61-
# lock on the db, so we need to find another way.
62-
# if commit:
63-
# self.flush()
64-
# self.env.cr.commit()
65-
6694
# Actual processing
6795
try:
6896
try:
6997
with self.env.cr.savepoint():
98+
_logger.info(
99+
"perform %s[channel=%s,uuid=%s]",
100+
self.id,
101+
self.channel,
102+
self.uuid,
103+
)
70104
job.perform()
105+
_logger.info(
106+
"performed %s[channel=%s,uuid=%s]",
107+
self.id,
108+
self.channel,
109+
self.uuid,
110+
)
71111
job.set_done()
72112
job.store()
73113
except OperationalError as err:
@@ -87,20 +127,28 @@ def _process(self, commit=False):
87127
msg = _("Job interrupted and set to Done: nothing to do.")
88128
job.set_done(msg)
89129
job.store()
130+
_logger.info(
131+
"interrupted %s[channel=%s,uuid=%s]", self.id, self.channel, self.uuid
132+
)
90133

91134
except RetryableJobError as err:
92135
# delay the job later, requeue
93136
job.postpone(result=str(err), seconds=5)
94137
job.set_pending(reset_retry=False)
95138
job.store()
96-
_logger.debug("%s postponed", job)
139+
_logger.info(
140+
"postponed %s[channel=%s,uuid=%s]", self.id, self.channel, self.uuid
141+
)
97142

98143
except (FailedJobError, Exception):
99144
with StringIO() as buff:
100145
traceback.print_exc(file=buff)
101146
_logger.error(buff.getvalue())
102147
job.set_failed(exc_info=buff.getvalue())
103148
job.store()
149+
_logger.info(
150+
"failed %s[channel=%s,uuid=%s]", self.id, self.channel, self.uuid
151+
)
104152

105153
if commit: # pragma: no cover
106154
self.env["base"].flush()
@@ -113,10 +161,10 @@ def _process(self, commit=False):
113161
@api.model
114162
def _job_runner(self, commit=True):
115163
"""Short-lived job runner, triggered by async crons"""
116-
job = self._acquire_one_job()
164+
job = self._acquire_one_job(commit=commit)
117165
while job:
118166
job._process(commit=commit)
119-
job = self._acquire_one_job()
167+
job = self._acquire_one_job(commit=commit)
120168
# TODO: If limit_time_real_cron is reached before all the jobs are done,
121169
# the worker will be killed abruptly.
122170
# Ideally, find a way to know if we're close to reaching this limit,

queue_job_cron_jobrunner/tests/test_queue_job.py

+78-17
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,16 @@
33
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl).
44

55
from datetime import datetime
6+
from unittest import mock
67

78
from freezegun import freeze_time
89

910
from odoo import SUPERUSER_ID, api
1011
from odoo.tests.common import TransactionCase
1112
from odoo.tools import mute_logger
1213

14+
from odoo.addons.queue_job.jobrunner import QueueJobRunner
15+
1316

1417
class TestQueueJob(TransactionCase):
1518
def setUp(self):
@@ -56,7 +59,6 @@ def test_queue_job_cron_trigger_enqueue_dependencies(self):
5659
# if the state is "waiting_dependencies", it means the "enqueue_waiting()"
5760
# step has not been done when the parent job has been done
5861
self.assertEqual(job_record_depends.state, "done", "Processed OK")
59-
self.assertEqual(self.cron.nextcall, datetime(2022, 2, 22, 22, 22, 22))
6062

6163
@freeze_time("2022-02-22 22:22:22")
6264
def test_concurrent_cron_access(self):
@@ -70,20 +72,7 @@ def test_concurrent_cron_access(self):
7072
(self.cron.id,),
7173
log_exceptions=False,
7274
)
73-
74-
delayable = self.env["res.partner"].delayable().create({"name": "test"})
75-
delayable2 = self.env["res.partner"].delayable().create({"name": "test2"})
76-
delayable.on_done(delayable2)
77-
delayable.delay()
78-
job_record = delayable._generated_job.db_record()
79-
job_record_depends = delayable2._generated_job.db_record()
80-
81-
self.env["queue.job"]._job_runner(commit=False)
82-
83-
self.assertEqual(job_record.state, "done", "Processed OK")
84-
# if the state is "waiting_dependencies", it means the "enqueue_waiting()"
85-
# step has not been done when the parent job has been done
86-
self.assertEqual(job_record_depends.state, "done", "Processed OK")
75+
self.env["res.partner"].delayable().create({"name": "test"})
8776
self.assertNotEqual(self.cron.nextcall, datetime(2022, 2, 22, 22, 22, 22))
8877

8978
def test_acquire_one_job_use_priority(self):
@@ -98,7 +87,9 @@ def test_acquire_one_job_use_priority(self):
9887
with freeze_time("2024-01-01 10:03:01"):
9988
self.env["res.partner"].with_delay(priority=2).create({"name": "test"})
10089

101-
self.assertEqual(self.env["queue.job"]._acquire_one_job(), job.db_record())
90+
self.assertEqual(
91+
self.env["queue.job"]._acquire_one_job(commit=False), job.db_record()
92+
)
10293

10394
def test_acquire_one_job_consume_the_oldest_first(self):
10495
with freeze_time("2024-01-01 10:01:01"):
@@ -112,4 +103,74 @@ def test_acquire_one_job_consume_the_oldest_first(self):
112103
with freeze_time("2024-01-01 10:03:01"):
113104
self.env["res.partner"].with_delay(priority=30).create({"name": "test"})
114105

115-
self.assertEqual(self.env["queue.job"]._acquire_one_job(), job.db_record())
106+
self.assertEqual(
107+
self.env["queue.job"]._acquire_one_job(commit=False), job.db_record()
108+
)
109+
110+
def test_acquire_one_job_starts_job(self):
111+
job = self.env["res.partner"].with_delay(priority=1).create({"name": "test"})
112+
113+
result = self.env["queue.job"]._acquire_one_job(commit=False)
114+
115+
self.assertEqual(result, job.db_record())
116+
self.assertEqual(job.db_record().state, "started")
117+
118+
def test_acquire_one_job_do_not_overload_channel(self):
119+
runner = QueueJobRunner.from_environ_or_config()
120+
runner.channel_manager.get_channel_by_name(
121+
"root.foobar", autocreate=True
122+
).capacity = 2
123+
job1 = (
124+
self.env["res.partner"]
125+
.with_delay(channel="root.foobar")
126+
.create({"name": "test1"})
127+
)
128+
job2 = (
129+
self.env["res.partner"]
130+
.with_delay(channel="root.foobar")
131+
.create({"name": "test2"})
132+
)
133+
self.env["res.partner"].with_delay(channel="root.foobar").create(
134+
{"name": "test3"}
135+
)
136+
137+
with mock.patch.object(
138+
QueueJobRunner, "from_environ_or_config", return_value=runner
139+
):
140+
first_acquired_job = self.env["queue.job"]._acquire_one_job(commit=False)
141+
second_acquired_job = self.env["queue.job"]._acquire_one_job(commit=False)
142+
third_acquired_job = self.env["queue.job"]._acquire_one_job(commit=False)
143+
144+
self.assertEqual(first_acquired_job, job1.db_record())
145+
self.assertEqual(second_acquired_job, job2.db_record())
146+
self.assertEqual(third_acquired_job, self.env["queue.job"].browse())
147+
148+
def test_acquire_one_job_root_capacity_ignored(self):
149+
runner = QueueJobRunner.from_environ_or_config()
150+
runner.channel_manager.get_channel_by_name("root", autocreate=True).capacity = 0
151+
job1 = (
152+
self.env["res.partner"].with_delay(channel="root").create({"name": "test1"})
153+
)
154+
job2 = (
155+
self.env["res.partner"].with_delay(channel="root").create({"name": "test2"})
156+
)
157+
job3 = (
158+
self.env["res.partner"].with_delay(channel="root").create({"name": "test3"})
159+
)
160+
161+
with mock.patch.object(
162+
QueueJobRunner, "from_environ_or_config", return_value=runner
163+
):
164+
first_acquired_job = self.env["queue.job"]._acquire_one_job(commit=False)
165+
second_acquired_job = self.env["queue.job"]._acquire_one_job(commit=False)
166+
third_acquired_job = self.env["queue.job"]._acquire_one_job(commit=False)
167+
168+
self.assertEqual(first_acquired_job, job1.db_record())
169+
self.assertEqual(second_acquired_job, job2.db_record())
170+
self.assertEqual(third_acquired_job, job3.db_record())
171+
172+
@freeze_time("2022-02-22 22:22:22")
173+
def test_queue_job_creation_create_change_next_call(self):
174+
self.cron.nextcall = datetime(2021, 1, 21, 21, 21, 21)
175+
self.env["res.partner"].with_delay().create({"name": "test"})
176+
self.assertNotEqual(self.cron.nextcall, datetime(2022, 2, 22, 22, 22, 22))

0 commit comments

Comments
 (0)