Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[15.0][UPD] Forward port changes from 14.0 #730

Open
wants to merge 14 commits into
base: 15.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion base_export_async/tests/test_base_export_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@

class TestBaseExportAsync(common.TransactionCase):
def setUp(self):
super(TestBaseExportAsync, self).setUp()
super().setUp()
self.delay_export_obj = self.env["delay.export"]
self.job_obj = self.env["queue.job"]
_request_stack.push(
Expand Down
7 changes: 7 additions & 0 deletions base_import_async/data/queue_job_function_data.xml
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
<odoo noupdate="1">
<record model="queue.job.channel" id="channel_base_import">
<field name="name">base_import</field>
<field name="parent_id" ref="queue_job.channel_root" />
</record>

<record id="job_function_base_import_import_split_file" model="queue.job.function">
<field name="model_id" ref="base_import.model_base_import_import" />
<field name="method">_split_file</field>
<field name="channel_id" ref="channel_base_import" />
<field
name="related_action"
eval='{"func_name": "_related_action_attachment"}'
Expand All @@ -13,6 +19,7 @@
>
<field name="model_id" ref="base_import.model_base_import_import" />
<field name="method">_import_one_chunk</field>
<field name="channel_id" ref="channel_base_import" />
<field
name="related_action"
eval='{"func_name": "_related_action_attachment"}'
Expand Down
49 changes: 41 additions & 8 deletions queue_job/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,38 @@ Note: ``delay()`` must be called on the delayable, chain, or group which is at t
of the graph. In the example above, if it was called on ``group_a``, then ``group_b``
would never be delayed (but a warning would be shown).

It is also possible to split a job into several jobs, each one processing a part of the
work. This can be useful to avoid very long jobs, parallelize some task and get more specific
errors. Usage is as follows:

.. code-block:: python

def button_split_delayable(self):
(
self # Can be a big recordset, let's say 1000 records
.delayable()
.generate_thumbnail((50, 50))
.set(priority=30)
.set(description=_("generate xxx"))
.split(50) # Split the job in 20 jobs of 50 records each
.delay()
)

The ``split()`` method takes a ``chain`` boolean keyword argument. If set to
True, the jobs will be chained, meaning that the next job will only start when the previous
one is done:

.. code-block:: python

def button_increment_var(self):
(
self
.delayable()
.increment_counter()
.split(1, chain=True) # Will exceute the jobs one after the other
.delay()
)


Enqueing Job Options
--------------------
Expand Down Expand Up @@ -421,13 +453,14 @@ Example:
When you are developing (ie: connector modules) you might want
to bypass the queue job and run your code immediately.

To do so you can set `TEST_QUEUE_JOB_NO_DELAY=1` in your enviroment.
To do so you can set `QUEUE_JOB__NO_DELAY=1` in your environment.

**Bypass jobs in tests**

When writing tests on job-related methods is always tricky to deal with
delayed recordsets. To make your testing life easier
you can set `test_queue_job_no_delay=True` in the context.
delayed recordsets. To make your testing life easier,
or to run a delayed action immediately,
you can set `queue_job__no_delay=True` in the context.

Tip: you can do this at test case level like this

Expand All @@ -438,7 +471,7 @@ Tip: you can do this at test case level like this
super().setUpClass()
cls.env = cls.env(context=dict(
cls.env.context,
test_queue_job_no_delay=True, # no jobs thanks
queue_job__no_delay=True, # no jobs thanks
))

Then all your tests execute the job methods synchronously
Expand Down Expand Up @@ -543,15 +576,15 @@ If you prefer, you can still test the whole thing in a single test, by calling
When you are developing (ie: connector modules) you might want
to bypass the queue job and run your code immediately.

To do so you can set ``TEST_QUEUE_JOB_NO_DELAY=1`` in your environment.
To do so you can set ``QUEUE_JOB__NO_DELAY=1`` in your environment.

.. WARNING:: Do not do this in production

**Execute jobs synchronously in tests**

You should use ``trap_jobs``, really, but if for any reason you could not use it,
and still need to have job methods executed synchronously in your tests, you can
do so by setting ``test_queue_job_no_delay=True`` in the context.
do so by setting ``queue_job__no_delay=True`` in the context.

Tip: you can do this at test case level like this

Expand All @@ -562,7 +595,7 @@ Tip: you can do this at test case level like this
super().setUpClass()
cls.env = cls.env(context=dict(
cls.env.context,
test_queue_job_no_delay=True, # no jobs thanks
queue_job__no_delay=True, # no jobs thanks
))

Then all your tests execute the job methods synchronously without delaying any
Expand All @@ -572,7 +605,7 @@ In tests you'll have to mute the logger like:

@mute_logger('odoo.addons.queue_job.models.base')

.. NOTE:: in graphs of jobs, the ``test_queue_job_no_delay`` context key must be in at
.. NOTE:: in graphs of jobs, the ``queue_job__no_delay`` context key must be in at
least one job's env of the graph for the whole graph to be executed synchronously


Expand Down
60 changes: 49 additions & 11 deletions queue_job/delay.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@

import itertools
import logging
import os
import uuid
from collections import defaultdict, deque

from .job import Job
from .utils import must_run_without_delay

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -217,17 +217,9 @@ def _has_to_execute_directly(self, vertices):
In tests, prefer to use
:func:`odoo.addons.queue_job.tests.common.trap_jobs`.
"""
if os.getenv("TEST_QUEUE_JOB_NO_DELAY"):
_logger.warning(
"`TEST_QUEUE_JOB_NO_DELAY` env var found. NO JOB scheduled."
)
return True
envs = {vertex.recordset.env for vertex in vertices}
for env in envs:
if env.context.get("test_queue_job_no_delay"):
_logger.warning(
"`test_queue_job_no_delay` ctx key found. NO JOB scheduled."
)
if must_run_without_delay(env):
return True
return False

Expand Down Expand Up @@ -534,6 +526,52 @@ def delay(self):
"""Delay the whole graph"""
self._graph.delay()

def split(self, size, chain=False):
"""Split the Delayables.

Use `DelayableGroup` or `DelayableChain`
if `chain` is True containing batches of size `size`
"""
if not self._job_method:
raise ValueError("No method set on the Delayable")

total_records = len(self.recordset)

delayables = []
for index in range(0, total_records, size):
recordset = self.recordset[index : index + size]
delayable = Delayable(
recordset,
priority=self.priority,
eta=self.eta,
max_retries=self.max_retries,
description=self.description,
channel=self.channel,
identity_key=self.identity_key,
)
# Update the __self__
delayable._job_method = getattr(recordset, self._job_method.__name__)
delayable._job_args = self._job_args
delayable._job_kwargs = self._job_kwargs

delayables.append(delayable)

description = self.description or (
self._job_method.__doc__.splitlines()[0].strip()
if self._job_method.__doc__
else "{}.{}".format(self.recordset._name, self._job_method.__name__)
)
for index, delayable in enumerate(delayables):
delayable.set(
description="%s (split %s/%s)"
% (description, index + 1, len(delayables))
)

# Prevent warning on deletion
self._generated_job = True

return (DelayableChain if chain else DelayableGroup)(*delayables)

def _build_job(self):
if self._generated_job:
return self._generated_job
Expand Down Expand Up @@ -571,7 +609,7 @@ def _execute_direct(self):
self._generated_job.perform()


class DelayableRecordset(object):
class DelayableRecordset:
"""Allow to delay a method for a recordset (shortcut way)

Usage::
Expand Down
14 changes: 11 additions & 3 deletions queue_job/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def identity_exact_hasher(job_):


@total_ordering
class Job(object):
class Job:
"""A Job is a task to execute. It is the in-memory representation of a job.

Jobs are stored in the ``queue.job`` Odoo Model, but they are handled
Expand Down Expand Up @@ -539,8 +539,8 @@ def perform(self):

return self.result

def enqueue_waiting(self):
sql = """
def _get_common_dependent_jobs_query(self):
return """
UPDATE queue_job
SET state = %s
FROM (
Expand Down Expand Up @@ -568,9 +568,17 @@ def enqueue_waiting(self):
AND %s = ALL(jobs.parent_states)
AND state = %s;
"""

def enqueue_waiting(self):
sql = self._get_common_dependent_jobs_query()
self.env.cr.execute(sql, (PENDING, self.uuid, DONE, WAIT_DEPENDENCIES))
self.env["queue.job"].invalidate_cache(["state"])

def cancel_dependent_jobs(self):
sql = self._get_common_dependent_jobs_query()
self.env.cr.execute(sql, (CANCELLED, self.uuid, CANCELLED, WAIT_DEPENDENCIES))
self.env["queue.job"].invalidate_cache(["state"])

def store(self):
"""Store the Job"""
job_model = self.env["queue.job"]
Expand Down
10 changes: 5 additions & 5 deletions queue_job/jobrunner/channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
_logger = logging.getLogger(__name__)


class PriorityQueue(object):
class PriorityQueue:
"""A priority queue that supports removing arbitrary objects.

Adding an object already in the queue is a no op.
Expand Down Expand Up @@ -103,7 +103,7 @@ def pop(self):


@total_ordering
class ChannelJob(object):
class ChannelJob:
"""A channel job is attached to a channel and holds the properties of a
job that are necessary to prioritise them.

Expand Down Expand Up @@ -205,7 +205,7 @@ def __lt__(self, other):
return self.sorting_key() < other.sorting_key()


class ChannelQueue(object):
class ChannelQueue:
"""A channel queue is a priority queue for jobs.

Jobs with an eta are set aside until their eta is past due, at
Expand Down Expand Up @@ -334,7 +334,7 @@ def get_wakeup_time(self, wakeup_time=0):
return wakeup_time


class Channel(object):
class Channel:
"""A channel for jobs, with a maximum capacity.

When jobs are created by queue_job modules, they may be associated
Expand Down Expand Up @@ -581,7 +581,7 @@ def split_strip(s, sep, maxsplit=-1):
return [x.strip() for x in s.split(sep, maxsplit)]


class ChannelManager(object):
class ChannelManager:
"""High level interface for channels

This class handles:
Expand Down
4 changes: 2 additions & 2 deletions queue_job/jobrunner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ def urlopen():
thread.start()


class Database(object):
class Database:
def __init__(self, db_name):
self.db_name = db_name
connection_info = _connection_info_for(db_name)
Expand Down Expand Up @@ -344,7 +344,7 @@ def set_job_enqueued(self, uuid):
)


class QueueJobRunner(object):
class QueueJobRunner:
def __init__(
self,
scheme="http",
Expand Down
7 changes: 2 additions & 5 deletions queue_job/models/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html)

import functools
import logging

from odoo import api, models

from ..delay import Delayable
from ..job import DelayableRecordset

_logger = logging.getLogger(__name__)
from ..utils import must_run_without_delay


class Base(models.AbstractModel):
Expand Down Expand Up @@ -216,8 +214,7 @@ def auto_delay_wrapper(self, *args, **kwargs):
if (
self.env.context.get("job_uuid")
or not context_delay
or self.env.context.get("_job_force_sync")
or self.env.context.get("test_queue_job_no_delay")
or must_run_without_delay(self.env)
):
# we are in the job execution
return auto_delay_wrapper.origin(self, *args, **kwargs)
Expand Down
2 changes: 2 additions & 0 deletions queue_job/models/queue_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,8 @@ def _change_job_state(self, state, result=None):
elif state == CANCELLED:
job_.set_cancelled(result=result)
job_.store()
record.env["queue.job"].flush()
job_.cancel_dependent_jobs()
else:
raise ValueError("State not supported: %s" % state)

Expand Down
Loading
Loading