diff --git a/.gitignore b/.gitignore index ef302c113..c28b7513a 100644 --- a/.gitignore +++ b/.gitignore @@ -8,6 +8,7 @@ eggs/ .eggs/ *.egg-info/ *.egg +.tox/ # Editor Temps .*.sw? diff --git a/cwltool/main.py b/cwltool/main.py index 638b90b16..622abcc85 100755 --- a/cwltool/main.py +++ b/cwltool/main.py @@ -30,6 +30,7 @@ from . import draft2tool from .builder import adjustFileObjs, adjustDirObjs from .stdfsaccess import StdFsAccess +from .pack import pack _logger = logging.getLogger("cwltool") @@ -495,74 +496,9 @@ def makeRelative(ob): stdout.write(json.dumps(deps, indent=4)) -def flatten_deps(d, files): # type: (Any, Set[Text]) -> None - if isinstance(d, list): - for s in d: - flatten_deps(s, files) - elif isinstance(d, dict): - files.add(d["location"]) - if "secondaryFiles" in d: - flatten_deps(d["secondaryFiles"], files) - -def find_run(d, runs): # type: (Any, Set[Text]) -> None - if isinstance(d, list): - for s in d: - find_run(s, runs) - elif isinstance(d, dict): - if "run" in d and isinstance(d["run"], (Text, Text)): - runs.add(d["run"]) - for s in d.values(): - find_run(s, runs) - -def replace_refs(d, rewrite, stem, newstem): - # type: (Any, Dict[Text, Text], Text, Text) -> None - if isinstance(d, list): - for s,v in enumerate(d): - if isinstance(v, (str, Text)) and v.startswith(stem): - d[s] = newstem + v[len(stem):] - else: - replace_refs(v, rewrite, stem, newstem) - elif isinstance(d, dict): - if "run" in d and isinstance(d["run"], (str, Text)): - d["run"] = rewrite[d["run"]] - for s,v in d.items(): - if isinstance(v, (str, Text)) and v.startswith(stem): - d[s] = newstem + v[len(stem):] - replace_refs(v, rewrite, stem, newstem) - def print_pack(document_loader, processobj, uri, metadata): - # type: (Loader, Any, Text, Dict[Text, Text]) -> Text - def loadref(b, u): - # type: (Text, Text) -> Union[Dict, List, Text] - return document_loader.resolve_ref(u, base_url=b)[0] - deps = scandeps(uri, processobj, set(("run",)), set(), loadref) - - fdeps = set((uri,)) - flatten_deps(deps, fdeps) - - runs = set() # type: Set[Text] - for f in fdeps: - find_run(document_loader.idx[f], runs) - - rewrite = {} - if isinstance(processobj, list): - for p in processobj: - rewrite[p["id"]] = "#" + shortname(p["id"]) - else: - rewrite[uri] = "#main" - - for r in runs: - rewrite[r] = "#" + shortname(r) - - packed = {"$graph": [], "cwlVersion": metadata["cwlVersion"] - } # type: Dict[Text, Any] - for r,v in rewrite.items(): - dc = cast(Dict[Text, Any], copy.deepcopy(document_loader.idx[r])) - dc["id"] = v - dc["name"] = v - replace_refs(dc, rewrite, r+"/" if "#" in r else r+"#", v+"/") - packed["$graph"].append(dc) - + # type: (Loader, Union[Dict[unicode, Any], List[Dict[unicode, Any]]], unicode, Dict[unicode, Any]) -> str + packed = pack(document_loader, processobj, uri, metadata) if len(packed["$graph"]) > 1: return json.dumps(packed, indent=4) else: diff --git a/cwltool/pack.py b/cwltool/pack.py new file mode 100644 index 000000000..7541c1162 --- /dev/null +++ b/cwltool/pack.py @@ -0,0 +1,84 @@ +import copy +import json + +from schema_salad.ref_resolver import Loader + +from .process import scandeps, shortname + +from typing import Union, Any, cast, Callable, Dict, Tuple, Type, IO, Text + +def flatten_deps(d, files): # type: (Any, Set[Text]) -> None + if isinstance(d, list): + for s in d: + flatten_deps(s, files) + elif isinstance(d, dict): + files.add(d["location"]) + if "secondaryFiles" in d: + flatten_deps(d["secondaryFiles"], files) + +def find_run(d, runs): # type: (Any, Set[Text]) -> None + if isinstance(d, list): + for s in d: + find_run(s, runs) + elif isinstance(d, dict): + if "run" in d and isinstance(d["run"], (str, unicode)): + runs.add(d["run"]) + for s in d.values(): + find_run(s, runs) + +def replace_refs(d, rewrite, stem, newstem): + # type: (Any, Dict[Text, Text], Text, Text) -> None + if isinstance(d, list): + for s,v in enumerate(d): + if isinstance(v, (str, unicode)) and v.startswith(stem): + d[s] = newstem + v[len(stem):] + else: + replace_refs(v, rewrite, stem, newstem) + elif isinstance(d, dict): + if "package" in d: + raise Exception("where the fuck did this come from %s" % json.dumps(d, indent=4)) + if "run" in d and isinstance(d["run"], (str, unicode)): + d["run"] = rewrite[d["run"]] + for s,v in d.items(): + if isinstance(v, (str, unicode)) and v.startswith(stem): + d[s] = newstem + v[len(stem):] + replace_refs(v, rewrite, stem, newstem) + +def pack(document_loader, processobj, uri, metadata): + # type: (Loader, Union[Dict[Text, Any], List[Dict[Text, Any]]], Text, Dict[Text, Text]) -> Dict[Text, Any] + def loadref(b, u): + # type: (Text, Text) -> Union[Dict, List, Text] + return document_loader.resolve_ref(u, base_url=b)[0] + deps = scandeps(uri, processobj, set(("run",)), set(), loadref) + + fdeps = set((uri,)) + flatten_deps(deps, fdeps) + + runs = set() # type: Set[Text] + for f in fdeps: + find_run(document_loader.idx[f], runs) + + rewrite = {} + if isinstance(processobj, list): + for p in processobj: + rewrite[p["id"]] = "#" + shortname(p["id"]) + else: + rewrite[uri] = "#main" + + for r in runs: + rewrite[r] = "#" + shortname(r) + + packed = {"$graph": [], "cwlVersion": metadata["cwlVersion"] + } # type: Dict[Text, Any] + + for r in sorted(rewrite.keys()): + v = rewrite[r] + dc = cast(Dict[Text, Any], copy.deepcopy(document_loader.idx[r])) + dc["id"] = v + for n in ("name", "package", "cwlVersion"): + if n in dc: + del dc[n] + replace_refs(dc, rewrite, r+"/" if "#" in r else r+"#", v+"/") + packed["$graph"].append(dc) + + return packed diff --git a/cwltool/process.py b/cwltool/process.py index 716e955ea..38959070d 100644 --- a/cwltool/process.py +++ b/cwltool/process.py @@ -312,6 +312,17 @@ class Process(object): def __init__(self, toolpath_object, **kwargs): # type: (Dict[Text, Any], **Any) -> None + """ + kwargs: + + metadata: tool document metadata + requirements: inherited requirements + hints: inherited hints + loader: schema_salad.ref_resolver.Loader used to load tool document + avsc_names: CWL Avro schema object used to validate document + strict: flag to determine strict validation (fail on unrecognized fields) + """ + self.metadata = kwargs.get("metadata", {}) # type: Dict[Text,Any] self.names = None # type: avro.schema.Names @@ -338,6 +349,9 @@ def __init__(self, toolpath_object, **kwargs): if "loader" in kwargs: self.formatgraph = kwargs["loader"].graph + self.doc_loader = kwargs["loader"] + self.doc_schema = kwargs["avsc_names"] + checkRequirements(self.tool, supportedProcessRequirements) self.validate_hints(kwargs["avsc_names"], self.tool.get("hints", []), strict=kwargs.get("strict")) @@ -395,6 +409,22 @@ def __init__(self, toolpath_object, **kwargs): def _init_job(self, joborder, **kwargs): # type: (Dict[Text, Text], **Any) -> Builder + """ + kwargs: + + eval_timeout: javascript evaluation timeout + use_container: do/don't use Docker when DockerRequirement hint provided + make_fs_access: make an FsAccess() object with given basedir + basedir: basedir for FsAccess + docker_outdir: output directory inside docker for this job + docker_tmpdir: tmpdir inside docker for this job + docker_stagedir: stagedir inside docker for this job + outdir: outdir on host for this job + tmpdir: tmpdir on host for this job + stagedir: stagedir on host for this job + select_resources: callback to select compute resources + """ + builder = Builder() builder.job = cast(Dict[Text, Union[Dict[Text, Any], List, Text]], copy.deepcopy(joborder)) diff --git a/cwltool/workflow.py b/cwltool/workflow.py index b91e957c6..8426ca0b2 100644 --- a/cwltool/workflow.py +++ b/cwltool/workflow.py @@ -443,6 +443,7 @@ def __init__(self, toolpath_object, pos, **kwargs): u"Tool definition %s failed validation:\n%s" % (toolpath_object["run"], validate.indent(str(v)))) + self.tool = toolpath_object = copy.deepcopy(toolpath_object) for stepfield, toolfield in (("in", "inputs"), ("out", "outputs")): toolpath_object[toolfield] = [] for step_entry in toolpath_object[stepfield]: diff --git a/tests/test_pack.py b/tests/test_pack.py new file mode 100644 index 000000000..1c4eed63c --- /dev/null +++ b/tests/test_pack.py @@ -0,0 +1,17 @@ +import unittest +import json +from cwltool.load_tool import fetch_document, validate_document +import cwltool.pack +import cwltool.workflow + +class TestPack(unittest.TestCase): + def test_pack(self): + self.maxDiff = None + + document_loader, workflowobj, uri = fetch_document("tests/wf/revsort.cwl") + document_loader, avsc_names, processobj, metadata, uri = validate_document( + document_loader, workflowobj, uri) + packed = cwltool.pack.pack(document_loader, processobj, uri, metadata) + with open("tests/wf/expect_packed.cwl") as f: + expect_packed = json.load(f) + self.assertEqual(expect_packed, packed) diff --git a/tests/wf/expect_packed.cwl b/tests/wf/expect_packed.cwl new file mode 100644 index 000000000..b1e0879cc --- /dev/null +++ b/tests/wf/expect_packed.cwl @@ -0,0 +1,125 @@ +{ + "cwlVersion": "v1.0", + "$graph": [ + { + "inputs": [ + { + "doc": "The input file to be processed.", + "type": "File", + "id": "#main/input" + }, + { + "default": true, + "doc": "If true, reverse (decending) sort", + "type": "boolean", + "id": "#main/reverse_sort" + } + ], + "doc": "Reverse the lines in a document, then sort those lines.", + "class": "Workflow", + "steps": [ + { + "out": [ + "#main/rev/output" + ], + "run": "#revtool.cwl", + "id": "#main/rev", + "in": [ + { + "source": "#main/input", + "id": "#main/rev/input" + } + ] + }, + { + "out": [ + "#main/sorted/output" + ], + "run": "#sorttool.cwl", + "id": "#main/sorted", + "in": [ + { + "source": "#main/rev/output", + "id": "#main/sorted/input" + }, + { + "source": "#main/reverse_sort", + "id": "#main/sorted/reverse" + } + ] + } + ], + "outputs": [ + { + "outputSource": "#main/sorted/output", + "type": "File", + "id": "#main/output", + "doc": "The output with the lines reversed and sorted." + } + ], + "id": "#main", + "hints": [ + { + "dockerPull": "debian:8", + "class": "DockerRequirement" + } + ] + }, + { + "inputs": [ + { + "inputBinding": {}, + "type": "File", + "id": "#revtool.cwl/input" + } + ], + "stdout": "output.txt", + "doc": "Reverse each line using the `rev` command", + "baseCommand": "rev", + "class": "CommandLineTool", + "outputs": [ + { + "outputBinding": { + "glob": "output.txt" + }, + "type": "File", + "id": "#revtool.cwl/output" + } + ], + "id": "#revtool.cwl" + }, + { + "inputs": [ + { + "inputBinding": { + "position": 1, + "prefix": "--reverse" + }, + "type": "boolean", + "id": "#sorttool.cwl/reverse" + }, + { + "inputBinding": { + "position": 2 + }, + "type": "File", + "id": "#sorttool.cwl/input" + } + ], + "stdout": "output.txt", + "doc": "Sort lines using the `sort` command", + "baseCommand": "sort", + "class": "CommandLineTool", + "outputs": [ + { + "outputBinding": { + "glob": "output.txt" + }, + "type": "File", + "id": "#sorttool.cwl/output" + } + ], + "id": "#sorttool.cwl" + } + ] +} \ No newline at end of file diff --git a/tests/wf/revsort-job.json b/tests/wf/revsort-job.json new file mode 100644 index 000000000..f5671aab2 --- /dev/null +++ b/tests/wf/revsort-job.json @@ -0,0 +1,6 @@ +{ + "input": { + "class": "File", + "location": "whale.txt" + } +} diff --git a/tests/wf/revsort.cwl b/tests/wf/revsort.cwl new file mode 100644 index 000000000..a6b2774ad --- /dev/null +++ b/tests/wf/revsort.cwl @@ -0,0 +1,65 @@ +# +# This is a two-step workflow which uses "revtool" and "sorttool" defined above. +# +class: Workflow +doc: "Reverse the lines in a document, then sort those lines." +cwlVersion: v1.0 + +# Requirements & hints specify prerequisites and extensions to the workflow. +# In this example, DockerRequirement specifies a default Docker container +# in which the command line tools will execute. +hints: + - class: DockerRequirement + dockerPull: debian:8 + + +# The inputs array defines the structure of the input object that describes +# the inputs to the workflow. +# +# The "reverse_sort" input parameter demonstrates the "default" field. If the +# field "reverse_sort" is not provided in the input object, the default value will +# be used. +inputs: + input: + type: File + doc: "The input file to be processed." + reverse_sort: + type: boolean + default: true + doc: "If true, reverse (decending) sort" + +# The "outputs" array defines the structure of the output object that describes +# the outputs of the workflow. +# +# Each output field must be connected to the output of one of the workflow +# steps using the "connect" field. Here, the parameter "#output" of the +# workflow comes from the "#sorted" output of the "sort" step. +outputs: + output: + type: File + outputSource: sorted/output + doc: "The output with the lines reversed and sorted." + +# The "steps" array lists the executable steps that make up the workflow. +# The tool to execute each step is listed in the "run" field. +# +# In the first step, the "inputs" field of the step connects the upstream +# parameter "#input" of the workflow to the input parameter of the tool +# "revtool.cwl#input" +# +# In the second step, the "inputs" field of the step connects the output +# parameter "#reversed" from the first step to the input parameter of the +# tool "sorttool.cwl#input". +steps: + rev: + in: + input: input + out: [output] + run: revtool.cwl + + sorted: + in: + input: rev/output + reverse: reverse_sort + out: [output] + run: sorttool.cwl diff --git a/tests/wf/revtool.cwl b/tests/wf/revtool.cwl new file mode 100644 index 000000000..7f279643a --- /dev/null +++ b/tests/wf/revtool.cwl @@ -0,0 +1,37 @@ +# +# Simplest example command line program wrapper for the Unix tool "rev". +# +class: CommandLineTool +cwlVersion: v1.0 +doc: "Reverse each line using the `rev` command" + +# The "inputs" array defines the structure of the input object that describes +# the inputs to the underlying program. Here, there is one input field +# defined that will be called "input" and will contain a "File" object. +# +# The input binding indicates that the input value should be turned into a +# command line argument. In this example inputBinding is an empty object, +# which indicates that the file name should be added to the command line at +# a default location. +inputs: + input: + type: File + inputBinding: {} + +# The "outputs" array defines the structure of the output object that +# describes the outputs of the underlying program. Here, there is one +# output field defined that will be called "output", must be a "File" type, +# and after the program executes, the output value will be the file +# output.txt in the designated output directory. +outputs: + output: + type: File + outputBinding: + glob: output.txt + +# The actual program to execute. +baseCommand: rev + +# Specify that the standard output stream must be redirected to a file called +# output.txt in the designated output directory. +stdout: output.txt diff --git a/tests/wf/sorttool.cwl b/tests/wf/sorttool.cwl new file mode 100644 index 000000000..a4853217b --- /dev/null +++ b/tests/wf/sorttool.cwl @@ -0,0 +1,35 @@ +# Example command line program wrapper for the Unix tool "sort" +# demonstrating command line flags. +class: CommandLineTool +doc: "Sort lines using the `sort` command" +cwlVersion: v1.0 + +# This example is similar to the previous one, with an additional input +# parameter called "reverse". It is a boolean parameter, which is +# intepreted as a command line flag. The value of "prefix" is used for +# flag to put on the command line if "reverse" is true, if "reverse" is +# false, no flag is added. +# +# This example also introduced the "position" field. This indicates the +# sorting order of items on the command line. Lower numbers are placed +# before higher numbers. Here, the "--reverse" flag (if present) will be +# added to the command line before the input file path. +inputs: + - id: reverse + type: boolean + inputBinding: + position: 1 + prefix: "--reverse" + - id: input + type: File + inputBinding: + position: 2 + +outputs: + - id: output + type: File + outputBinding: + glob: output.txt + +baseCommand: sort +stdout: output.txt