2
2
import copy
3
3
import functools
4
4
import subprocess
5
+ from pathlib import Path
5
6
from typing import Iterable
6
7
from typing import Optional
7
8
from typing import Union
12
13
from _pytask .nodes import FilePathNode
13
14
from _pytask .nodes import PythonFunctionTask
14
15
from _pytask .parametrize import _copy_func
15
- from _pytask .shared import to_list
16
16
17
17
18
18
def r (options : Optional [Union [str , Iterable [str ]]] = None ):
@@ -31,10 +31,9 @@ def r(options: Optional[Union[str, Iterable[str]]] = None):
31
31
return options
32
32
33
33
34
- def run_r_script (depends_on , r ):
34
+ def run_r_script (r ):
35
35
"""Run an R script."""
36
- script = to_list (depends_on )[0 ]
37
- subprocess .run (["Rscript" , script .as_posix (), * r ], check = True )
36
+ subprocess .run (r , check = True )
38
37
39
38
40
39
@hookimpl
@@ -50,32 +49,37 @@ def pytask_collect_task(session, path, name, obj):
50
49
task = PythonFunctionTask .from_path_name_function_session (
51
50
path , name , obj , session
52
51
)
52
+
53
+ return task
54
+
55
+
56
+ @hookimpl
57
+ def pytask_collect_task_teardown (session , task ):
58
+ """Perform some checks."""
59
+ if get_specific_markers_from_task (task , "r" ):
60
+ source = _get_node_from_dictionary (task .depends_on , "source" )
61
+ if isinstance (source , FilePathNode ) and source .value .suffix not in [".r" , ".R" ]:
62
+ raise ValueError (
63
+ "The first dependency of an R task must be the executable script."
64
+ )
65
+
53
66
r_function = _copy_func (run_r_script )
54
67
r_function .pytaskmark = copy .deepcopy (task .function .pytaskmark )
55
68
56
69
merged_marks = _merge_all_markers (task )
57
70
args = r (* merged_marks .args , ** merged_marks .kwargs )
58
- r_function = functools .partial (r_function , r = args )
71
+ options = _prepare_cmd_options (session , task , args )
72
+ r_function = functools .partial (r_function , r = options )
59
73
60
74
task .function = r_function
61
75
62
- return task
63
-
64
76
65
- @hookimpl
66
- def pytask_collect_task_teardown (task ):
67
- """Perform some checks.
68
-
69
- Remove is task is none check with pytask 0.0.9.
70
-
71
- """
72
- if task is not None and get_specific_markers_from_task (task , "r" ):
73
- if isinstance (task .depends_on [0 ], FilePathNode ) and task .depends_on [
74
- 0
75
- ].value .suffix not in [".r" , ".R" ]:
76
- raise ValueError (
77
- "The first dependency of an R task must be the executable script."
78
- )
77
+ def _get_node_from_dictionary (obj , key , fallback = 0 ):
78
+ if isinstance (obj , Path ):
79
+ pass
80
+ elif isinstance (obj , dict ):
81
+ obj = obj .get (key ) or obj .get (fallback )
82
+ return obj
79
83
80
84
81
85
def _merge_all_markers (task ):
@@ -85,3 +89,14 @@ def _merge_all_markers(task):
85
89
for mark_ in r_marks [1 :]:
86
90
mark = mark .combined_with (mark_ )
87
91
return mark
92
+
93
+
94
+ def _prepare_cmd_options (session , task , args ):
95
+ """Prepare the command line arguments to execute the do-file.
96
+
97
+ The last entry changes the name of the log file. We take the task id as a name which
98
+ is unique and does not cause any errors when parallelizing the execution.
99
+
100
+ """
101
+ source = _get_node_from_dictionary (task .depends_on , session .config ["r_source_key" ])
102
+ return ["Rscript" , source .value .as_posix (), * args ]
0 commit comments