-
-
Notifications
You must be signed in to change notification settings - Fork 42
/
Copy pathfunc_ingress.py
63 lines (50 loc) · 2.02 KB
/
func_ingress.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# Copyright (C) 2020-2022 C-PAC Developers
# This file is part of C-PAC.
# C-PAC is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the
# Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.
# C-PAC is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
# You should have received a copy of the GNU Lesser General Public
# License along with C-PAC. If not, see <https://www.gnu.org/licenses/>.
"""Ingress functional data for preprocessing."""
from CPAC.utils.strategy import Strategy
def connect_func_ingress(
workflow,
strat_list: list[Strategy],
c,
sub_dict,
subject_id,
input_creds_path,
unique_id=None,
):
"""Connect functional ingress workflow."""
for num_strat, strat in enumerate(strat_list):
if "func" in sub_dict:
func_paths_dict = sub_dict["func"]
else:
func_paths_dict = sub_dict["rest"]
if unique_id is None:
workflow_name = f"func_gather_{num_strat}"
else:
workflow_name = f"func_gather_{unique_id}_{num_strat}"
func_wf = strat._resource_pool.create_func_datasource(
func_paths_dict, workflow_name
)
func_wf.inputs.inputnode.set(
subject=subject_id,
creds_path=input_creds_path,
dl_dir=c.pipeline_setup["working_directory"]["path"],
)
func_wf.get_node("inputnode").iterables = ("scan", list(func_paths_dict.keys()))
strat.update_resource_pool(
{
"subject": (func_wf, "outputspec.subject"),
"scan": (func_wf, "outputspec.scan"),
}
)
diff, blip, fmap_rp_list = strat.rpool.ingress_func_metadata()
return strat.rpool.wf, diff, blip, fmap_rp_list