Skip to content

Commit 88a5fb1

Browse files
authored
Enable PathNode and PickleNode to deal with URLs, S3, etc.. (#525)
1 parent 132028f commit 88a5fb1

File tree

13 files changed

+198
-20
lines changed

13 files changed

+198
-20
lines changed

.pre-commit-config.yaml

+1
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ repos:
9999
docs/source/how_to_guides/using_task_returns.md|
100100
docs/source/how_to_guides/writing_custom_nodes.md|
101101
docs/source/how_to_guides/hashing_inputs_of_tasks.md|
102+
docs/source/how_to_guides/remote_files.md|
102103
docs/source/reference_guides/hookspecs.md|
103104
docs/source/tutorials/configuration.md|
104105
docs/source/tutorials/debugging.md|

docs/source/changes.md

+8-7
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@ releases are available on [PyPI](https://pypi.org/project/pytask) and
1515
{func}`~pytask.task`. Closes {issue}`512`.
1616
- {pull}`522` improves the issue templates.
1717
- {pull}`523` refactors `_pytask.console._get_file`.
18-
- {pull}`524` improves some linting and formatter rules.
18+
- {pull}`524` improves some linting and formatting rules.
19+
- {pull}`525` enables pytask to work with remote files using universal_pathlib.
1920

2021
## 0.4.4 - 2023-12-04
2122

@@ -25,14 +26,14 @@ releases are available on [PyPI](https://pypi.org/project/pytask) and
2526
## 0.4.3 - 2023-12-01
2627

2728
- {pull}`483` simplifies the teardown of a task.
28-
- {pull}`484` raises more informative error when directories instead of files are used
29-
with path nodes.
30-
- {pull}`485` adds missing steps to unconfigure pytask after the job is done which
29+
- {pull}`484` raises an informative error message when directories instead of files are
30+
used with path nodes.
31+
- {pull}`485` adds missing steps to unconfigure pytask after the job is done, which
3132
caused flaky tests.
3233
- {pull}`486` adds default names to {class}`~pytask.PPathNode`.
3334
- {pull}`488` raises an error when an invalid value is used in a return annotation.
3435
- {pull}`489` and {pull}`491` simplifies parsing products and does not raise an error
35-
when a product annotation is used with the argument name `produces`. And, allow
36+
when a product annotation is used with the argument name `produces`. And allow
3637
`produces` to intake any node.
3738
- {pull}`490` refactors and better tests parsing of dependencies.
3839
- {pull}`493` allows tasks to depend on other tasks.
@@ -49,7 +50,7 @@ releases are available on [PyPI](https://pypi.org/project/pytask) and
4950

5051
## 0.4.2 - 2023-11-08
5152

52-
- {pull}`449` simplifies the code building the plugin manager.
53+
- {pull}`449` simplifies the code building of the plugin manager.
5354
- {pull}`451` improves `collect_command.py` and renames `graph.py` to `dag_command.py`.
5455
- {pull}`454` removes more `.svg`s and replaces them with animations.
5556
- {pull}`455` adds more explanation when {meth}`~pytask.PNode.load` fails during the
@@ -58,7 +59,7 @@ releases are available on [PyPI](https://pypi.org/project/pytask) and
5859
- {pull}`457` refactors everything around formatting node names.
5960
- {pull}`459` adds a pre-commit hook to sort `__all__`.
6061
- {pull}`460` simplifies removing internal tracebacks from exceptions with a cause.
61-
- {pull}`463` raise error when a task function is not defined inside the loop body.
62+
- {pull}`463` raises an error when a task function is not defined inside the loop body.
6263
- {pull}`464` improves pinned dependencies.
6364
- {pull}`465` adds test to ensure internal tracebacks are removed by reports.
6465
- {pull}`466` implements hashing for files instead of modification timestamps.

docs/source/how_to_guides/index.md

+1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ maxdepth: 1
1313
---
1414
migrating_from_scripts_to_pytask
1515
interfaces_for_dependencies_products
16+
remote_files
1617
functional_interface
1718
capture_warnings
1819
how_to_influence_build_order
+92
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
# Remote files
2+
3+
So far, we have only dealt with local files in the tutorials and guides. But there are
4+
lots of use cases to deal with remote files.
5+
6+
- You distribute the workflow without the data and want to make it easy for others to
7+
get started. So, some tasks reference remote files instead of local files.
8+
- You store the workflow results in remote storage to save and distribute them.
9+
10+
pytask uses [universal_pathlib](https://github.com/fsspec/universal_pathlib) to work
11+
with remote files. The package provides a {mod}`pathlib`-like interface, making it very
12+
easy to interact with files from an HTTP(S)-, Dropbox-, S3-, GCP-, Azure-based
13+
filesystem, and many more.
14+
15+
:::{warning}
16+
universal_pathlib does currently not support Python 3.12. To track progress, see [this
17+
PR](https://github.com/fsspec/universal_pathlib/pull/152) and check the [releases
18+
`>0.1.4`](https://github.com/fsspec/universal_pathlib/releases)
19+
:::
20+
21+
## HTTP(S)-based filesystem
22+
23+
As an example for dealing with an HTTP(S)-based filesystem, we will download the iris
24+
data set and save it as a CSV file.
25+
26+
```{literalinclude} ../../../docs_src/how_to_guides/remote_files/https.py
27+
```
28+
29+
## Other filesystems
30+
31+
universal_pathlib supports Azure Storage, Dropbox, Google Cloud Storage, AWS S3, and
32+
[many more filesystems](https://github.com/fsspec/universal_pathlib#currently-supported-filesystems-and-schemes).
33+
34+
For example, let us try accessing a file in an S3 bucket. We pass `anon=True` to
35+
{class}`~upath.UPath` since no credentials are required.
36+
37+
```pycon
38+
>>> from upath import UPath
39+
>>> path = UPath("s3://upath-aws-example/iris.data", anon=True)
40+
>>> path.stat()
41+
ModuleNotFoundError
42+
...
43+
ImportError: Install s3fs to access S3
44+
```
45+
46+
Some filesystems are supported
47+
[out-of-the-box](https://filesystem-spec.readthedocs.io/en/latest/api.html#built-in-implementations).
48+
[Others](https://filesystem-spec.readthedocs.io/en/latest/api.html#other-known-implementations)
49+
are available as plugins and require additional packages.
50+
51+
After installing s3fs, rerun the command.
52+
53+
```pycon
54+
>>> path.stat()
55+
{'ETag': '"42615765a885ddf54427f12c34a0a070"',
56+
'LastModified': datetime.datetime(2023, 12, 11, 23, 50, 3, tzinfo=tzutc()),
57+
'size': 4551,
58+
'name': 'upath-aws-example/iris.data',
59+
'type': 'file',
60+
'StorageClass': 'STANDARD',
61+
'VersionId': None,
62+
'ContentType': 'binary/octet-stream'}
63+
```
64+
65+
Usually, you will need credentials to access files. Search in
66+
[fsspec's documentation](https://filesystem-spec.readthedocs.io/en/latest)
67+
or the plugin's documentation, here
68+
[s3fs](https://s3fs.readthedocs.io/en/latest/#credentials), for information on
69+
authentication. One way would be to set the environment variables `AWS_ACCESS_KEY_ID`
70+
and `AWS_SECRET_ACCESS_KEY`.
71+
72+
## Detecting changes in remote files
73+
74+
pytask uses the [entity tag (ETag)](https://en.wikipedia.org/wiki/HTTP_ETag) to detect
75+
changes in remote files. The ETag is an optional header field that can signal a file has
76+
changed. For example,
77+
[AWS S3 uses an MD5 digest](https://teppen.io/2018/06/23/aws_s3_etags/) of the uploaded
78+
file as the ETag. If the file changes, so does the ETag, and pytask will detect it.
79+
80+
Many files on the web do not provide an ETag like this version of the iris dataset.
81+
82+
```pycon
83+
>>> import requests
84+
>>> url = "https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data"
85+
>>> r = requests.head(url)
86+
>>> r.headers
87+
{'Server': 'nginx/1.25.3', 'Date': 'Sun, 10 Dec 2023 23:59:21 GMT', 'Connection': 'keep-alive'}
88+
```
89+
90+
In these instances, pytask does not recognize if the file has changed and only reruns
91+
the task if other conditions are not met, like the product is missing, the task module
92+
has changed, etc..

docs/source/tutorials/defining_dependencies_products.md

+4
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@ If you want to avoid type annotations for now, look at the tab named `produces`.
1414
The `Decorators` tab documents the deprecated approach that should not be used
1515
anymore and will be removed in version v0.5.
1616

17+
In this tutorial, we only deal with local files. If you want to use pytask with files
18+
online, S3, GCP, Azure, etc., read the
19+
{doc}`guide on remote files <../how_to_guides/remote_files>`.
20+
1721
First, we focus on defining products that should already be familiar to you. Then,
1822
we focus on how you can declare task dependencies.
1923

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
from pathlib import Path
2+
from typing import Annotated
3+
4+
from upath import UPath
5+
6+
7+
url = UPath("https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data")
8+
9+
10+
def task_download_file(path: UPath = url) -> Annotated[str, Path("data.csv")]:
11+
return path.read_text()

environment.yml

+1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ dependencies:
2121
- sqlalchemy >=1.4.36
2222
- tomli >=1.0.0
2323
- typing_extensions
24+
- universal_pathlib
2425

2526
# Misc
2627
- deepdiff

pyproject.toml

+4
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ name = "Tobias Raabe"
5757
5858

5959
[project.optional-dependencies]
60+
all = ['universal-pathlib; python_version<"3.12"']
6061
docs = [
6162
"furo",
6263
"ipython",
@@ -76,6 +77,9 @@ test = [
7677
"pytest-cov",
7778
"pytest-xdist",
7879
"syrupy",
80+
# For HTTPPath tests.
81+
"aiohttp",
82+
"requests",
7983
]
8084

8185
[project.urls]

src/_pytask/collect.py

+22-2
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,14 @@
4141
from _pytask.typing import is_task_function
4242
from rich.text import Text
4343

44+
try:
45+
from upath import UPath
46+
except ImportError: # pragma: no cover
47+
48+
class UPath: # type: ignore[no-redef]
49+
...
50+
51+
4452
if TYPE_CHECKING:
4553
from _pytask.session import Session
4654
from _pytask.models import NodeInfo
@@ -310,7 +318,7 @@ def pytask_collect_task(
310318

311319

312320
@hookimpl(trylast=True)
313-
def pytask_collect_node(session: Session, path: Path, node_info: NodeInfo) -> PNode: # noqa: C901
321+
def pytask_collect_node(session: Session, path: Path, node_info: NodeInfo) -> PNode: # noqa: C901, PLR0912
314322
"""Collect a node of a task as a :class:`pytask.PNode`.
315323
316324
Strings are assumed to be paths. This might be a strict assumption, but since this
@@ -354,12 +362,24 @@ def pytask_collect_node(session: Session, path: Path, node_info: NodeInfo) -> PN
354362
node.path, session.config["paths"] or (session.config["root"],)
355363
)
356364

357-
if isinstance(node, PPathNode) and node.path.is_dir():
365+
# Skip ``is_dir`` for remote UPaths because it downloads the file and blocks the
366+
# collection.
367+
if (
368+
isinstance(node, PPathNode)
369+
and not isinstance(node.path, UPath)
370+
and node.path.is_dir()
371+
):
358372
raise ValueError(_TEMPLATE_ERROR_DIRECTORY.format(path=node.path))
359373

360374
if isinstance(node, PNode):
361375
return node
362376

377+
if isinstance(node, UPath):
378+
if not node.protocol:
379+
node = Path(node)
380+
else:
381+
return PathNode(name=node.name, path=node)
382+
363383
if isinstance(node, Path):
364384
if not node.is_absolute():
365385
node = path.joinpath(node)

src/_pytask/nodes.py

+17-4
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import hashlib
55
import inspect
66
import pickle
7+
from os import stat_result
78
from pathlib import Path # noqa: TCH003
89
from typing import Any
910
from typing import Callable
@@ -178,8 +179,14 @@ def state(self) -> str | None:
178179
179180
"""
180181
if self.path.exists():
181-
modification_time = self.path.stat().st_mtime
182-
return hash_path(self.path, modification_time)
182+
stat = self.path.stat()
183+
if isinstance(stat, stat_result):
184+
modification_time = self.path.stat().st_mtime
185+
return hash_path(self.path, modification_time)
186+
if isinstance(stat, dict):
187+
return stat.get("ETag", "0")
188+
msg = "Unknown stat object."
189+
raise NotImplementedError(msg)
183190
return None
184191

185192
def load(self, is_product: bool = False) -> Path: # noqa: ARG002
@@ -317,8 +324,14 @@ def from_path(cls, path: Path) -> PickleNode:
317324

318325
def state(self) -> str | None:
319326
if self.path.exists():
320-
modification_time = self.path.stat().st_mtime
321-
return hash_path(self.path, modification_time)
327+
stat = self.path.stat()
328+
if isinstance(stat, stat_result):
329+
modification_time = self.path.stat().st_mtime
330+
return hash_path(self.path, modification_time)
331+
if isinstance(stat, dict):
332+
return stat.get("ETag", "0")
333+
msg = "Unknown stat object."
334+
raise NotImplementedError(msg)
322335
return None
323336

324337
def load(self, is_product: bool = False) -> Any:

src/_pytask/path.py

+16-5
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,9 @@ def relative_to(path: Path, source: Path, include_source: bool = True) -> Path:
5050
return Path(source_name, path.relative_to(source))
5151

5252

53-
def find_closest_ancestor(path: Path, potential_ancestors: Sequence[Path]) -> Path:
53+
def find_closest_ancestor(
54+
path: Path, potential_ancestors: Sequence[Path]
55+
) -> Path | None:
5456
"""Find the closest ancestor of a path.
5557
5658
In case a path is the path to the task file itself, we return the path.
@@ -76,10 +78,19 @@ def find_closest_ancestor(path: Path, potential_ancestors: Sequence[Path]) -> Pa
7678
if ancestor == path:
7779
return path
7880

79-
candidate = find_common_ancestor(path, ancestor)
80-
potential_closest_ancestors.append(candidate)
81-
82-
return sorted(potential_closest_ancestors, key=lambda x: len(x.parts))[-1]
81+
with contextlib.suppress(ValueError):
82+
candidate = find_common_ancestor(path, ancestor)
83+
potential_closest_ancestors.append(candidate)
84+
85+
return next(
86+
(
87+
i
88+
for i in sorted(
89+
potential_closest_ancestors, key=lambda x: len(x.parts), reverse=True
90+
)
91+
),
92+
None,
93+
)
8394

8495

8596
def find_common_ancestor(*paths: Path) -> Path:

tests/test_execute.py

+19
Original file line numberDiff line numberDiff line change
@@ -1027,3 +1027,22 @@ def func(path): path.touch()
10271027
result = runner.invoke(cli, [tmp_path.as_posix()])
10281028
assert result.exit_code == ExitCode.OK
10291029
assert tmp_path.joinpath("out.txt").exists()
1030+
1031+
1032+
@pytest.mark.skipif(sys.version_info >= (3, 12), reason="Not supported in Python 3.12.")
1033+
def test_with_http_path(runner, tmp_path):
1034+
source = """
1035+
from upath import UPath
1036+
from typing_extensions import Annotated
1037+
1038+
url = "https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data"
1039+
1040+
def task_example(path = UPath(url)) -> Annotated[str, UPath("data.txt")]:
1041+
return path.read_text()
1042+
"""
1043+
tmp_path.joinpath("task_example.py").write_text(textwrap.dedent(source))
1044+
1045+
result = runner.invoke(cli, [tmp_path.as_posix()])
1046+
print(result.output) # noqa: T201
1047+
assert result.exit_code == ExitCode.OK
1048+
assert tmp_path.joinpath("data.txt").exists()

tox.ini

+2-2
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@ passenv = CI
66
usedevelop = true
77

88
[testenv:pytest]
9-
extras = test
9+
extras = all, test
1010
deps =
1111
pygraphviz;platform_system != "Windows"
1212

1313
commands =
14-
pytest {posargs} -vv
14+
pytest {posargs}

0 commit comments

Comments
 (0)