Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement task generators and provisional nodes. #487

Merged
merged 109 commits into from
Mar 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
109 commits
Select commit Hold shift + click to select a range
2cd96d7
Simplify TopologicalSorter.
tobiasraabe Oct 21, 2023
6f2aaef
Remove static_order.
tobiasraabe Oct 21, 2023
e37abd6
Remove backup.
tobiasraabe Oct 21, 2023
789e0d8
Add instantiation from another sorter and a DAG.
tobiasraabe Oct 22, 2023
b196584
Add a delayed node that can serve as a product.
tobiasraabe Oct 23, 2023
7bbb5b8
Raise errors when not delayed tasks receive delayed dependencies.
tobiasraabe Oct 23, 2023
af7eb2c
Everything works except tasks depending on delayed nodes.
tobiasraabe Oct 23, 2023
18318a3
Fix.
tobiasraabe Oct 23, 2023
5dd1be1
Merge branch 'main' into dag
tobiasraabe Oct 23, 2023
0436b3d
Merge branch 'dag' into delayed-nodes
tobiasraabe Oct 23, 2023
b8a16c4
fix.
tobiasraabe Oct 23, 2023
c96d58d
Add draft.
tobiasraabe Oct 25, 2023
7433efb
Merge branch 'main' into delayed-nodes
tobiasraabe Nov 8, 2023
ecb2e60
fix.
tobiasraabe Nov 8, 2023
cb32db1
Fix.
tobiasraabe Nov 8, 2023
61bda5c
Merge branch 'dag' into delayed-nodes
tobiasraabe Nov 8, 2023
41c76cf
Temp.
tobiasraabe Nov 8, 2023
918557a
Merge branch 'main' into delayed-nodes
tobiasraabe Nov 9, 2023
e71bc51
Allow executing tasks with delayednodes.
tobiasraabe Nov 9, 2023
3ca95df
Allow tasks to depend on delayed nodes.
tobiasraabe Nov 9, 2023
ca77a19
to changes.
tobiasraabe Nov 10, 2023
4337700
fix.
tobiasraabe Nov 10, 2023
c2db3c5
Fix.
tobiasraabe Nov 10, 2023
104aa5f
remove allow_delayed.
tobiasraabe Nov 10, 2023
43afe7b
Merge remote-tracking branch 'origin/main' into delayed-nodes
tobiasraabe Nov 10, 2023
9d70e3f
Fix test.
tobiasraabe Nov 10, 2023
0928a7e
extend test.
tobiasraabe Nov 10, 2023
a7ccb23
Merge branch 'main' into delayed-nodes
tobiasraabe Nov 10, 2023
f2d236b
Add new hook.
tobiasraabe Nov 11, 2023
fc60e00
Merge branch 'main' into delayed-nodes
tobiasraabe Nov 11, 2023
3b8ef38
fix.
tobiasraabe Nov 11, 2023
318a0b7
Merge branch 'main' into delayed-nodes
tobiasraabe Nov 11, 2023
948b743
Fixes.
tobiasraabe Nov 12, 2023
9d14eff
fix.
tobiasraabe Nov 12, 2023
701b46d
Allow tasks to depend on other tasks.
tobiasraabe Nov 13, 2023
ab9fd8d
Fix.
tobiasraabe Nov 13, 2023
8cf360d
Fix.
tobiasraabe Nov 13, 2023
9f61f92
Fix paths.
tobiasraabe Nov 13, 2023
bd0bb6f
Fix.
tobiasraabe Nov 14, 2023
413ffba
fix.
tobiasraabe Nov 14, 2023
fc9d40f
Merge remote-tracking branch 'origin/main' into allow-to-depend-on-taks
tobiasraabe Nov 14, 2023
649aa24
Fix.
tobiasraabe Nov 14, 2023
a013135
Merge branch 'allow-to-depend-on-taks' into delayed-nodes
tobiasraabe Nov 14, 2023
8353c7e
fix.
tobiasraabe Nov 14, 2023
e1cd75a
Fix.
tobiasraabe Nov 15, 2023
137144c
Fix.
tobiasraabe Nov 15, 2023
4903307
Fix.
tobiasraabe Nov 15, 2023
92e9990
Raise errors gracefully when recreating the DAG.
tobiasraabe Nov 15, 2023
71dd738
Fix.
tobiasraabe Nov 15, 2023
e078927
temp.
tobiasraabe Nov 15, 2023
640b465
Merge branch 'main' into allow-to-depend-on-taks
tobiasraabe Nov 16, 2023
880e95a
Merge branch 'allow-to-depend-on-taks' into delayed-nodes
tobiasraabe Nov 16, 2023
e7ab6ce
fix test.
tobiasraabe Nov 16, 2023
4ab7e0f
Get preliminary version of task generators to work.
tobiasraabe Nov 16, 2023
cabb531
Temp.
tobiasraabe Nov 16, 2023
3ab8bac
Temp.
tobiasraabe Nov 16, 2023
11b18ec
Merge branch 'main' into allow-to-depend-on-taks
tobiasraabe Nov 16, 2023
7fe2f27
Merge branch 'allow-to-depend-on-taks' into delayed-nodes
tobiasraabe Nov 16, 2023
7644b35
Fix.
tobiasraabe Nov 16, 2023
f4c78c6
Merge branch 'main' into delayed-nodes
tobiasraabe Nov 16, 2023
2bff286
Fix.
tobiasraabe Nov 17, 2023
c1c1e7a
Refine task generators even more.
tobiasraabe Nov 17, 2023
421db8e
fix some.
tobiasraabe Nov 18, 2023
1d9e85b
Add docs.
tobiasraabe Nov 22, 2023
ac51dcf
Merge remote-tracking branch 'origin/main' into delayed-nodes
tobiasraabe Nov 23, 2023
4da17c8
Fix.
tobiasraabe Nov 25, 2023
6b256d3
Merge branch 'main' into delayed-nodes
tobiasraabe Nov 25, 2023
f172992
Rename delayed nodes to provisional nodes.
tobiasraabe Nov 25, 2023
6cc4a28
Make PDelayedNode independent from PNode.
tobiasraabe Nov 25, 2023
bfd7d38
Rename DelayedPathNode to DirectoryNode.
tobiasraabe Nov 25, 2023
80bd1ed
Add test for data catalog.
tobiasraabe Nov 25, 2023
23c7362
Adjust more types.
tobiasraabe Nov 25, 2023
89e1014
FIx types.
tobiasraabe Nov 25, 2023
c5eb99e
Update docs.
tobiasraabe Nov 25, 2023
dd4ab17
Remove talking about delayed.
tobiasraabe Nov 25, 2023
89e0588
Merge branch 'main' into delayed-nodes
tobiasraabe Jan 28, 2024
432f782
update
tobiasraabe Jan 28, 2024
fe64020
Fix.
tobiasraabe Jan 28, 2024
e7ca69d
Fix.
tobiasraabe Jan 29, 2024
5b4cba2
Rename generator to is_generator.
tobiasraabe Jan 30, 2024
deb45d1
Fix side-effect in tests.
tobiasraabe Feb 4, 2024
86f227d
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Feb 4, 2024
11e3484
Fix.
tobiasraabe Feb 4, 2024
ab47e5d
Fix.
tobiasraabe Feb 4, 2024
b683da5
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Feb 4, 2024
f56fa68
Fix docs and coverage.
tobiasraabe Feb 4, 2024
017ec9c
Merge remote-tracking branch 'refs/remotes/origin/delayed-nodes' into…
tobiasraabe Feb 4, 2024
240c47e
Add test for task generator in notebook.
tobiasraabe Feb 4, 2024
7d8829f
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Feb 4, 2024
6f18094
Merge branch 'main' into delayed-nodes
tobiasraabe Feb 4, 2024
11f5647
Merge remote-tracking branch 'refs/remotes/origin/delayed-nodes' into…
tobiasraabe Feb 4, 2024
01a00c2
FIx.
tobiasraabe Feb 4, 2024
0bff197
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Feb 4, 2024
c3b6122
Fix.
tobiasraabe Feb 4, 2024
d76c716
no pragma.
tobiasraabe Feb 4, 2024
c001a9d
Remove ways to generate tasks based on returns.
tobiasraabe Feb 4, 2024
f80d3ef
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Feb 4, 2024
315f354
Fix.
tobiasraabe Feb 4, 2024
0d41ac6
Merge remote-tracking branch 'refs/remotes/origin/delayed-nodes' into…
tobiasraabe Feb 4, 2024
ebabd84
Merge branch 'main' into delayed-nodes
tobiasraabe Mar 9, 2024
ef42e6e
Fix.
tobiasraabe Mar 9, 2024
eacecf4
Change.
tobiasraabe Mar 9, 2024
43486d7
Fix.
tobiasraabe Mar 10, 2024
c3b5596
Improve docs.
tobiasraabe Mar 13, 2024
d218e5a
Merge branch 'main' into delayed-nodes
tobiasraabe Mar 13, 2024
696e931
Merge branch 'main' into delayed-nodes
tobiasraabe Mar 15, 2024
bf95f75
Fix.
tobiasraabe Mar 15, 2024
bf4fcbd
Merge branch 'delayed-nodes' of https://github.com/pytask-dev/pytask …
tobiasraabe Mar 15, 2024
6e5679c
Last fixes.
tobiasraabe Mar 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/source/changes.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ releases are available on [PyPI](https://pypi.org/project/pytask) and
- {pull}`485` adds missing steps to unconfigure pytask after the job is done, which
caused flaky tests.
- {pull}`486` adds default names to {class}`~pytask.PPathNode`.
- {pull}`487` implements task generators and provisional nodes.
- {pull}`488` raises an error when an invalid value is used in a return annotation.
- {pull}`489` and {pull}`491` simplifies parsing products and does not raise an error
when a product annotation is used with the argument name `produces`. And allow
Expand Down
1 change: 1 addition & 0 deletions docs/source/how_to_guides/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ capture_warnings
how_to_influence_build_order
hashing_inputs_of_tasks
using_task_returns
provisional_nodes_and_task_generators
writing_custom_nodes
extending_pytask
the_data_catalog
Expand Down
94 changes: 94 additions & 0 deletions docs/source/how_to_guides/provisional_nodes_and_task_generators.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
# Provisional nodes and task generators

pytask's execution model can usually be separated into three phases.

1. Collection of tasks, dependencies, and products.
1. Building the DAG.
1. Executing the tasks.

But, in some situations, pytask needs to be more flexible.

Imagine you want to download a folder with files from an online storage. Before the task
is completed you do not know the total number of files or their filenames. How can you
still describe the files as products of the task?

And how would you define another task that depends on these files?

The following sections will explain how you use pytask in these situations.

## Producing provisional nodes

As an example for the aforementioned scenario, let us write a task that downloads all
files without a file extension from the root folder of the pytask GitHub repository. The
files are downloaded to a folder called `downloads`. `downloads` is in the same folder
as the task module because it is a relative path.

```{literalinclude} ../../../docs_src/how_to_guides/provisional_products.py
---
emphasize-lines: 4, 22
---
```

Since the names of the files are not known when pytask is started, we need to use a
{class}`~pytask.DirectoryNode` to define the task's product. With a
{class}`~pytask.DirectoryNode` we can specify where pytask can find the files. The files
are described with a root path (default is the directory of the task module) and a glob
pattern (default is `*`).

When we use the {class}`~pytask.DirectoryNode` as a product annotation, we get access to
the `root_dir` as a {class}`~pathlib.Path` object inside the function, which allows us
to store the files.

```{note}
The {class}`~pytask.DirectoryNode` is a provisional node that implements
{class}`~pytask.PProvisionalNode`. A provisional node is not a {class}`~pytask.PNode`,
but when its {meth}`~pytask.PProvisionalNode.collect` method is called, it returns
actual nodes. A {class}`~pytask.DirectoryNode`, for example, returns
{class}`~pytask.PathNode`.
```

## Depending on provisional nodes

In the next step, we want to define a task that consumes and merges all previously
downloaded files into one file.

The difficulty here is how can we reference the downloaded files before they have been
downloaded.

```{literalinclude} ../../../docs_src/how_to_guides/provisional_task.py
---
emphasize-lines: 9
---
```

To reference the files that will be downloaded, we use the
{class}`~pytask.DirectoryNode` is a dependency. Before the task is executed, the list of
files in the folder defined by the root path and the pattern are automatically collected
and passed to the task.

If we use a {class}`~pytask.DirectoryNode` with the same `root_dir` and `pattern` in
both tasks, pytask will automatically recognize that the second task depends on the
first. If that is not true, you might need to make this dependency more explicit by
using {func}`@task(after=...) <pytask.task>`, which is explained {ref}`here <after>`.

## Task generators

What if we wanted to process each downloaded file separately instead of dealing with
them in one task?

For that, we have to write a task generator to define an unknown number of tasks for an
unknown number of downloaded files.

A task generator is a task function in which we define more tasks, just as if we were
writing functions in a task module.

The code snippet shows each task takes one of the downloaded files and copies its
content to a `.txt` file.

```{literalinclude} ../../../docs_src/how_to_guides/provisional_task_generator.py
```

```{important}
The generated tasks need to be decoratored with {func}`@task <pytask.task>` to be
collected.
```
4 changes: 4 additions & 0 deletions docs/source/reference_guides/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ Protocols define how tasks and nodes for dependencies and products have to be se
:show-inheritance:
.. autoprotocol:: pytask.PTaskWithPath
:show-inheritance:
.. autoprotocol:: pytask.PProvisionalNode
:show-inheritance:
```

## Nodes
Expand All @@ -203,6 +205,8 @@ Nodes are the interface for different kinds of dependencies or products.
:members:
.. autoclass:: pytask.PythonNode
:members:
.. autoclass:: pytask.DirectoryNode
:members:
```

To parse dependencies and products from nodes, use the following functions.
Expand Down
6 changes: 3 additions & 3 deletions docs/source/tutorials/skipping_tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,18 @@ skip tasks during development that take too much time to compute right now.
```{literalinclude} ../../../docs_src/tutorials/skipping_tasks_example_1.py
```

Not only will this task be skipped, but all tasks that depend on
Not only will this task be skipped, but all tasks depending on
`time_intensive_product.pkl`.

## Conditional skipping

In large projects, you may have many long-running tasks that you only want to execute on
a remote server but not when you are not working locally.
a remote server, but not when you are not working locally.

In this case, use the {func}`@pytask.mark.skipif <pytask.mark.skipif>` decorator, which
requires a condition and a reason as arguments.

Place the condition variable in a different module than the task, so you can change it
Place the condition variable in a module different from the task so you can change it
without causing a rerun of the task.

```python
Expand Down
33 changes: 33 additions & 0 deletions docs_src/how_to_guides/provisional_products.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from pathlib import Path

import httpx
from pytask import DirectoryNode
from pytask import Product
from typing_extensions import Annotated


def get_files_without_file_extensions_from_repo() -> list[str]:
url = "https://api.github.com/repos/pytask-dev/pytask/git/trees/main"
response = httpx.get(url)
elements = response.json()["tree"]
return [
e["path"]
for e in elements
if e["type"] == "blob" and Path(e["path"]).suffix == ""
]


def task_download_files(
download_folder: Annotated[
Path, DirectoryNode(root_dir=Path("downloads"), pattern="*"), Product
],
) -> None:
"""Download files."""
# Contains names like CITATION or LICENSE.
files_to_download = get_files_without_file_extensions_from_repo()

for file_ in files_to_download:
url = "raw.githubusercontent.com/pytask-dev/pytask/main"
response = httpx.get(url=f"{url}/{file_}", timeout=5)
content = response.text
download_folder.joinpath(file_).write_text(content)
14 changes: 14 additions & 0 deletions docs_src/how_to_guides/provisional_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from pathlib import Path

from pytask import DirectoryNode
from typing_extensions import Annotated


def task_merge_files(
paths: Annotated[
list[Path], DirectoryNode(root_dir=Path("downloads"), pattern="*")
],
) -> Annotated[str, Path("all_text.txt")]:
"""Merge files."""
contents = [path.read_text() for path in paths]
return "\n".join(contents)
21 changes: 21 additions & 0 deletions docs_src/how_to_guides/provisional_task_generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from pathlib import Path

from pytask import DirectoryNode
from pytask import task
from typing_extensions import Annotated


@task(is_generator=True)
def task_copy_files(
paths: Annotated[
list[Path], DirectoryNode(root_dir=Path("downloads"), pattern="*")
],
) -> None:
"""Create tasks to copy each file to a ``.txt`` file."""
for path in paths:
# The path of the copy will be CITATION.txt, for example.
path_to_copy = path.with_suffix(".txt")

@task
def copy_file(path: Annotated[Path, path]) -> Annotated[str, path_to_copy]:
return path.read_text()
86 changes: 60 additions & 26 deletions src/_pytask/collect.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@
from _pytask.mark_utils import has_mark
from _pytask.node_protocols import PNode
from _pytask.node_protocols import PPathNode
from _pytask.node_protocols import PProvisionalNode
from _pytask.node_protocols import PTask
from _pytask.nodes import DirectoryNode
from _pytask.nodes import PathNode
from _pytask.nodes import PythonNode
from _pytask.nodes import Task
Expand Down Expand Up @@ -299,6 +301,8 @@ def pytask_collect_task(
raise ValueError(msg)

path_nodes = Path.cwd() if path is None else path.parent

# Collect dependencies and products.
dependencies = parse_dependencies_from_task_function(
session, path, name, path_nodes, obj
)
Expand All @@ -309,6 +313,9 @@ def pytask_collect_task(
markers = get_all_marks(obj)
collection_id = obj.pytask_meta._id if hasattr(obj, "pytask_meta") else None
after = obj.pytask_meta.after if hasattr(obj, "pytask_meta") else []
is_generator = (
obj.pytask_meta.is_generator if hasattr(obj, "pytask_meta") else False
)

# Get the underlying function to avoid having different states of the function,
# e.g. due to pytask_meta, in different layers of the wrapping.
Expand All @@ -321,7 +328,11 @@ def pytask_collect_task(
depends_on=dependencies,
produces=products,
markers=markers,
attributes={"collection_id": collection_id, "after": after},
attributes={
"collection_id": collection_id,
"after": after,
"is_generator": is_generator,
},
)
return Task(
base_name=name,
Expand All @@ -330,41 +341,25 @@ def pytask_collect_task(
depends_on=dependencies,
produces=products,
markers=markers,
attributes={"collection_id": collection_id, "after": after},
attributes={
"collection_id": collection_id,
"after": after,
"is_generator": is_generator,
},
)
if isinstance(obj, PTask):
return obj
return None


_TEMPLATE_ERROR: str = """\
The provided path of the dependency/product is

{}

, but the path of the file on disk is

{}

Case-sensitive file systems would raise an error because the upper and lower case \
format of the paths does not match.

Please, align the names to ensure reproducibility on case-sensitive file systems \
(often Linux or macOS) or disable this error with 'check_casing_of_paths = false' in \
the pyproject.toml file.

Hint: If parts of the path preceding your project directory are not properly \
formatted, check whether you need to call `.resolve()` on `SRC`, `BLD` or other paths \
created from the `__file__` attribute of a module.
"""


_TEMPLATE_ERROR_DIRECTORY: str = """\
The path '{path}' points to a directory, although only files are allowed."""


@hookimpl(trylast=True)
def pytask_collect_node(session: Session, path: Path, node_info: NodeInfo) -> PNode: # noqa: C901, PLR0912
def pytask_collect_node( # noqa: C901, PLR0912
session: Session, path: Path, node_info: NodeInfo
) -> PNode | PProvisionalNode:
"""Collect a node of a task as a :class:`pytask.PNode`.

Strings are assumed to be paths. This might be a strict assumption, but since this
Expand All @@ -384,6 +379,21 @@ def pytask_collect_node(session: Session, path: Path, node_info: NodeInfo) -> PN
"""
node = node_info.value

if isinstance(node, DirectoryNode):
if node.root_dir is None:
node.root_dir = path
if (
not node.name
or node.name == node.root_dir.joinpath(node.pattern).as_posix()
):
short_root_dir = shorten_path(
node.root_dir, session.config["paths"] or (session.config["root"],)
)
node.name = Path(short_root_dir, node.pattern).as_posix()

if isinstance(node, PProvisionalNode):
return node

if isinstance(node, PythonNode):
node.node_info = node_info
if not node.name:
Expand Down Expand Up @@ -418,9 +428,11 @@ def pytask_collect_node(session: Session, path: Path, node_info: NodeInfo) -> PN
raise ValueError(_TEMPLATE_ERROR_DIRECTORY.format(path=node.path))

if isinstance(node, PNode):
if not node.name:
node.name = create_name_of_python_node(node_info)
return node

if isinstance(node, UPath):
if isinstance(node, UPath): # pragma: no cover
if not node.protocol:
node = Path(node)
else:
Expand Down Expand Up @@ -459,6 +471,28 @@ def pytask_collect_node(session: Session, path: Path, node_info: NodeInfo) -> PN
return PythonNode(value=node, name=node_name, node_info=node_info)


_TEMPLATE_ERROR: str = """\
The provided path of the dependency/product is

{}

, but the path of the file on disk is

{}

Case-sensitive file systems would raise an error because the upper and lower case \
format of the paths does not match.

Please, align the names to ensure reproducibility on case-sensitive file systems \
(often Linux or macOS) or disable this error with 'check_casing_of_paths = false' in \
your pytask configuration file.

Hint: If parts of the path preceding your project directory are not properly \
formatted, check whether you need to call `.resolve()` on `SRC`, `BLD` or other paths \
created from the `__file__` attribute of a module.
"""


def _raise_error_if_casing_of_path_is_wrong(
path: Path, check_casing_of_paths: bool
) -> None:
Expand Down
Loading
Loading