1
1
"""Visualize a CWL workflow."""
2
2
3
3
from collections .abc import Iterator
4
- from pathlib import Path
4
+ from importlib . resources import files
5
5
from typing import cast
6
6
from urllib .parse import urlparse
7
7
8
8
import pydot
9
9
import rdflib
10
10
11
- _queries_dir = (Path (__file__ ).parent / "rdfqueries" ).resolve ()
12
- _get_inner_edges_query_path = _queries_dir / "get_inner_edges.sparql"
13
- _get_input_edges_query_path = _queries_dir / "get_input_edges.sparql"
14
- _get_output_edges_query_path = _queries_dir / "get_output_edges.sparql"
15
- _get_root_query_path = _queries_dir / "get_root.sparql"
11
+
12
+ def _get_inner_edges_query () -> str :
13
+ return files ("cwltool" ).joinpath ("rdfqueries/get_inner_edges.sparql" ).read_text ()
14
+
15
+
16
+ def _get_input_edges_query () -> str :
17
+ return files ("cwltool" ).joinpath ("rdfqueries/get_input_edges.sparql" ).read_text ()
18
+
19
+
20
+ def _get_output_edges_query () -> str :
21
+ return files ("cwltool" ).joinpath ("rdfqueries/get_output_edges.sparql" ).read_text ()
22
+
23
+
24
+ def _get_root_query () -> str :
25
+ return files ("cwltool" ).joinpath ("rdfqueries/get_root.sparql" ).read_text ()
16
26
17
27
18
28
class CWLViewer :
@@ -33,8 +43,7 @@ def _load_cwl_graph(self, rdf_description: str) -> rdflib.graph.Graph:
33
43
return rdf_graph
34
44
35
45
def _set_inner_edges (self ) -> None :
36
- with open (_get_inner_edges_query_path ) as f :
37
- get_inner_edges_query = f .read ()
46
+ get_inner_edges_query = _get_inner_edges_query ()
38
47
inner_edges = cast (
39
48
Iterator [rdflib .query .ResultRow ],
40
49
self ._rdf_graph .query (
@@ -96,8 +105,7 @@ def _set_inner_edges(self) -> None:
96
105
)
97
106
98
107
def _set_input_edges (self ) -> None :
99
- with open (_get_input_edges_query_path ) as f :
100
- get_input_edges_query = f .read ()
108
+ get_input_edges_query = _get_input_edges_query ()
101
109
inputs_subgraph = pydot .Subgraph (graph_name = "cluster_inputs" )
102
110
self ._dot_graph .add_subgraph (inputs_subgraph )
103
111
inputs_subgraph .set ("rank" , "same" )
@@ -124,8 +132,7 @@ def _set_input_edges(self) -> None:
124
132
self ._dot_graph .add_edge (pydot .Edge (str (input_row ["input" ]), str (input_row ["step" ])))
125
133
126
134
def _set_output_edges (self ) -> None :
127
- with open (_get_output_edges_query_path ) as f :
128
- get_output_edges = f .read ()
135
+ get_output_edges = _get_output_edges_query ()
129
136
outputs_graph = pydot .Subgraph (graph_name = "cluster_outputs" )
130
137
self ._dot_graph .add_subgraph (outputs_graph )
131
138
outputs_graph .set ("rank" , "same" )
@@ -152,8 +159,7 @@ def _set_output_edges(self) -> None:
152
159
self ._dot_graph .add_edge (pydot .Edge (output_edge_row ["step" ], output_edge_row ["output" ]))
153
160
154
161
def _get_root_graph_uri (self ) -> rdflib .term .Identifier :
155
- with open (_get_root_query_path ) as f :
156
- get_root_query = f .read ()
162
+ get_root_query = _get_root_query ()
157
163
root = cast (
158
164
list [rdflib .query .ResultRow ],
159
165
list (
0 commit comments