Skip to content

Commit 0e14ff2

Browse files
authored
Remove redundant calls of PNode.state(). (#571)
1 parent 9b27ec5 commit 0e14ff2

File tree

5 files changed

+28
-26
lines changed

5 files changed

+28
-26
lines changed

docs/source/changes.md

+2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ releases are available on [PyPI](https://pypi.org/project/pytask) and
2020
- {pull}`566` makes universal-pathlib an official dependency.
2121
- {pull}`568` restricts `task_files` to a list of patterns and raises a better error.
2222
- {pull}`569` removes the hooks related to the creation of the DAG.
23+
- {pull}`571` removes redundant calls to `PNode.state()` which causes a high penalty for
24+
remote files.
2325

2426
## 0.4.5 - 2024-01-09
2527

src/_pytask/database_utils.py

+3-4
Original file line numberDiff line numberDiff line change
@@ -69,11 +69,10 @@ def update_states_in_database(session: Session, task_signature: str) -> None:
6969
_create_or_update_state(task_signature, node.signature, hash_)
7070

7171

72-
def has_node_changed(task: PTask, node: PTask | PNode) -> bool:
72+
def has_node_changed(task: PTask, node: PTask | PNode, state: str | None) -> bool:
7373
"""Indicate whether a single dependency or product has changed."""
7474
# If node does not exist, we receive None.
75-
node_state = node.state()
76-
if node_state is None:
75+
if state is None:
7776
return True
7877

7978
with DatabaseSession() as session:
@@ -83,4 +82,4 @@ def has_node_changed(task: PTask, node: PTask | PNode) -> bool:
8382
if db_state is None:
8483
return True
8584

86-
return node_state != db_state.hash_
85+
return state != db_state.hash_

src/_pytask/execute.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,8 @@ def pytask_execute_task_setup(session: Session, task: PTask) -> None:
140140
node = dag.nodes[node_signature].get("task") or dag.nodes[
141141
node_signature
142142
].get("node")
143-
if node_signature in predecessors and not node.state():
143+
node_state = node.state()
144+
if node_signature in predecessors and not node_state:
144145
msg = f"{task.name!r} requires missing node {node.name!r}."
145146
if IS_FILE_SYSTEM_CASE_SENSITIVE:
146147
msg += (
@@ -149,7 +150,7 @@ def pytask_execute_task_setup(session: Session, task: PTask) -> None:
149150
)
150151
raise NodeNotFoundError(msg)
151152

152-
has_changed = has_node_changed(task=task, node=node)
153+
has_changed = has_node_changed(task=task, node=node, state=node_state)
153154
if has_changed:
154155
needs_to_be_executed = True
155156
break

src/_pytask/nodes.py

+13-17
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import hashlib
66
import inspect
77
import pickle
8+
from contextlib import suppress
89
from os import stat_result
910
from pathlib import Path # noqa: TCH003
1011
from typing import TYPE_CHECKING
@@ -75,12 +76,10 @@ def signature(self) -> str:
7576

7677
def state(self) -> str | None:
7778
"""Return the state of the node."""
78-
try:
79+
with suppress(OSError):
7980
source = inspect.getsource(self.function)
80-
except OSError:
81-
return None
82-
else:
8381
return hashlib.sha256(source.encode()).hexdigest()
82+
return None
8483

8584
def execute(self, **kwargs: Any) -> Any:
8685
"""Execute the task."""
@@ -137,10 +136,7 @@ def signature(self) -> str:
137136

138137
def state(self) -> str | None:
139138
"""Return the state of the node."""
140-
if self.path.exists():
141-
modification_time = self.path.stat().st_mtime
142-
return hash_path(self.path, modification_time)
143-
return None
139+
return _get_state(self.path)
144140

145141
def execute(self, **kwargs: Any) -> Any:
146142
"""Execute the task."""
@@ -180,9 +176,7 @@ def state(self) -> str | None:
180176
The state is given by the modification timestamp.
181177
182178
"""
183-
if self.path.exists():
184-
return _get_state(self.path)
185-
return None
179+
return _get_state(self.path)
186180

187181
def load(self, is_product: bool = False) -> Path: # noqa: ARG002
188182
"""Load the value."""
@@ -316,9 +310,7 @@ def from_path(cls, path: Path) -> PickleNode:
316310
return cls(name=path.as_posix(), path=path)
317311

318312
def state(self) -> str | None:
319-
if self.path.exists():
320-
return _get_state(self.path)
321-
return None
313+
return _get_state(self.path)
322314

323315
def load(self, is_product: bool = False) -> Any:
324316
if is_product:
@@ -331,15 +323,19 @@ def save(self, value: Any) -> None:
331323
pickle.dump(value, f)
332324

333325

334-
def _get_state(path: Path) -> str:
326+
def _get_state(path: Path) -> str | None:
335327
"""Get state of a path.
336328
337329
A simple function to handle local and remote files.
338330
339331
"""
340-
stat = path.stat()
332+
try:
333+
stat = path.stat()
334+
except FileNotFoundError:
335+
return None
336+
341337
if isinstance(stat, stat_result):
342-
modification_time = path.stat().st_mtime
338+
modification_time = stat.st_mtime
343339
return hash_path(path, modification_time)
344340
if isinstance(stat, UPathStatResult):
345341
return stat.as_info().get("ETag", "0")

src/_pytask/persist.py

+7-3
Original file line numberDiff line numberDiff line change
@@ -39,21 +39,25 @@ def pytask_execute_task_setup(session: Session, task: PTask) -> None:
3939
4040
"""
4141
if has_mark(task, "persist"):
42-
all_nodes_exist = all(
42+
all_states = [
4343
(
4444
session.dag.nodes[name].get("task") or session.dag.nodes[name]["node"]
4545
).state()
4646
for name in node_and_neighbors(session.dag, task.signature)
47-
)
47+
]
48+
all_nodes_exist = all(all_states)
4849

4950
if all_nodes_exist:
5051
any_node_changed = any(
5152
has_node_changed(
5253
task=task,
5354
node=session.dag.nodes[name].get("task")
5455
or session.dag.nodes[name]["node"],
56+
state=state,
57+
)
58+
for name, state in zip(
59+
node_and_neighbors(session.dag, task.signature), all_states
5560
)
56-
for name in node_and_neighbors(session.dag, task.signature)
5761
)
5862
if any_node_changed:
5963
raise Persisted

0 commit comments

Comments
 (0)