1
- '''
1
+ """
2
2
This module contains functionality for dependency resolution for constructing
3
3
the dependency graph of workflows.
4
- '''
4
+ """
5
5
6
6
import luigi
7
7
from luigi .contrib .postgres import PostgresTarget
10
10
11
11
# ==============================================================================
12
12
13
+
13
14
class TargetInfo (object ):
14
- '''
15
+ """
15
16
Class to be used for sending specification of which target, from which
16
17
task, to use, when stitching workflow tasks' outputs and inputs together.
17
- '''
18
+ """
18
19
task = None
19
20
path = None
20
21
target = None
@@ -25,21 +26,24 @@ def __init__(self, task, path, format=None, is_tmp=False):
25
26
self .target = luigi .LocalTarget (path , format , is_tmp )
26
27
27
28
def open (self , * args , ** kwargs ):
28
- '''
29
+ """
29
30
Forward open method, from luigi's target class
30
- '''
31
+ """
31
32
return self .target .open (* args , ** kwargs )
32
33
33
34
# ==============================================================================
34
35
36
+
35
37
class S3TargetInfo (TargetInfo ):
38
+
36
39
def __init__ (self , task , path , format = None , client = None ):
37
40
self .task = task
38
41
self .path = path
39
42
self .target = S3Target (path , format = format , client = client )
40
43
41
44
# ==============================================================================
42
45
46
+
43
47
class PostgresTargetInfo (TargetInfo ):
44
48
def __init__ (self , task , host , database , user , password , update_id , table = None , port = None ):
45
49
self .task = task
@@ -54,28 +58,30 @@ def __init__(self, task, host, database, user, password, update_id, table=None,
54
58
55
59
# ==============================================================================
56
60
61
+
57
62
class DependencyHelpers (object ):
58
- '''
63
+
64
+ """
59
65
Mixin implementing methods for supporting dynamic, and target-based
60
66
workflow definition, as opposed to the task-based one in vanilla luigi.
61
- '''
67
+ """
62
68
63
69
# --------------------------------------------------------
64
70
# Handle inputs
65
71
# --------------------------------------------------------
66
72
67
73
def requires (self ):
68
- '''
74
+ """
69
75
Implement luigi API method by returning upstream tasks
70
- '''
76
+ """
71
77
return self ._upstream_tasks ()
72
78
73
79
def _upstream_tasks (self ):
74
- '''
80
+ """
75
81
Extract upstream tasks from the TargetInfo objects
76
82
or functions returning those (or lists of both the earlier)
77
83
for use in luigi's requires() method.
78
- '''
84
+ """
79
85
upstream_tasks = []
80
86
for attrname , attrval in iteritems (self .__dict__ ):
81
87
if 'in_' == attrname [0 :3 ]:
@@ -84,11 +90,11 @@ def _upstream_tasks(self):
84
90
return upstream_tasks
85
91
86
92
def _parse_inputitem (self , val , tasks ):
87
- '''
93
+ """
88
94
Recursively loop through lists of TargetInfos, or
89
95
callables returning TargetInfos, or lists of ...
90
96
(repeat recursively) ... and return all tasks.
91
- '''
97
+ """
92
98
if callable (val ):
93
99
val = val ()
94
100
if isinstance (val , TargetInfo ):
@@ -108,17 +114,17 @@ def _parse_inputitem(self, val, tasks):
108
114
# --------------------------------------------------------
109
115
110
116
def output (self ):
111
- '''
117
+ """
112
118
Implement luigi API method
113
- '''
119
+ """
114
120
return self ._output_targets ()
115
121
116
122
def _output_targets (self ):
117
- '''
123
+ """
118
124
Extract output targets from the TargetInfo objects
119
125
or functions returning those (or lists of both the earlier)
120
126
for use in luigi's output() method.
121
- '''
127
+ """
122
128
output_targets = []
123
129
for attrname in dir (self ):
124
130
attrval = getattr (self , attrname )
@@ -128,11 +134,11 @@ def _output_targets(self):
128
134
return output_targets
129
135
130
136
def _parse_outputitem (self , val , targets ):
131
- '''
137
+ """
132
138
Recursively loop through lists of TargetInfos, or
133
139
callables returning TargetInfos, or lists of ...
134
140
(repeat recursively) ... and return all targets.
135
- '''
141
+ """
136
142
if callable (val ):
137
143
val = val ()
138
144
if isinstance (val , TargetInfo ):
0 commit comments