Skip to content

Various cleanups to make the docs better. #2093

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 43 additions & 31 deletions cwltool/checker.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
"""Static checking of CWL workflow connectivity."""

from collections import namedtuple
from collections.abc import Iterator, MutableMapping, MutableSequence, Sized
from typing import Any, Literal, Optional, Union, cast
from typing import Any, Literal, NamedTuple, Optional, Union, cast

from schema_salad.exceptions import ValidationException
from schema_salad.sourceline import SourceLine, bullets, strip_dup_lineno
Expand Down Expand Up @@ -184,8 +183,8 @@ def static_checker(
for param in workflow_inputs + step_outputs:
src_dict[cast(str, param["id"])] = param

step_inputs_val = check_all_types(src_dict, step_inputs, "source", param_to_step)
workflow_outputs_val = check_all_types(
step_inputs_val = _check_all_types(src_dict, step_inputs, "source", param_to_step)
workflow_outputs_val = _check_all_types(
src_dict, workflow_outputs, "outputSource", param_to_step
)

Expand All @@ -199,27 +198,34 @@ def static_checker(
sink = warning.sink
linkMerge = warning.linkMerge
sinksf = sorted(
p["pattern"] for p in sink.get("secondaryFiles", []) if p.get("required", True)
cast(str, p["pattern"])
for p in cast(MutableSequence[CWLObjectType], sink.get("secondaryFiles", []))
if p.get("required", True)
)
srcsf = sorted(
cast(str, p["pattern"])
for p in cast(MutableSequence[CWLObjectType], src.get("secondaryFiles", []))
)
srcsf = sorted(p["pattern"] for p in src.get("secondaryFiles", []))
# Every secondaryFile required by the sink, should be declared
# by the source
missing = missing_subset(srcsf, sinksf)
src_name = shortname(cast(str, src["id"]))
sink_id = cast(str, sink["id"])
sink_name = shortname(sink_id)
if missing:
msg1 = "Parameter '{}' requires secondaryFiles {} but".format(
shortname(sink["id"]),
sink_name,
missing,
)
msg3 = SourceLine(src, "id").makeError(
"source '%s' does not provide those secondaryFiles." % (shortname(src["id"]))
"source '%s' does not provide those secondaryFiles." % (src_name)
)
msg4 = SourceLine(src.get("_tool_entry", src), "secondaryFiles").makeError(
"To resolve, add missing secondaryFiles patterns to definition of '%s' or"
% (shortname(src["id"]))
% (src_name)
)
msg5 = SourceLine(sink.get("_tool_entry", sink), "secondaryFiles").makeError(
"mark missing secondaryFiles in definition of '%s' as optional."
% shortname(sink["id"])
"mark missing secondaryFiles in definition of '%s' as optional." % (sink_name)
)
msg = SourceLine(sink).makeError(
"{}\n{}".format(msg1, bullets([msg3, msg4, msg5], " "))
Expand All @@ -229,13 +235,13 @@ def static_checker(
msg = SourceLine(sink, "type").makeError(
"'%s' is not an input parameter of %s, expected %s"
% (
shortname(sink["id"]),
param_to_step[sink["id"]]["run"],
sink_name,
param_to_step[sink_id]["run"],
", ".join(
shortname(cast(str, s["id"]))
for s in cast(
list[dict[str, Union[str, bool]]],
param_to_step[sink["id"]]["inputs"],
param_to_step[sink_id]["inputs"],
)
if not s.get("not_connected")
),
Expand All @@ -247,12 +253,11 @@ def static_checker(
msg = (
SourceLine(src, "type").makeError(
"Source '%s' of type %s may be incompatible"
% (shortname(src["id"]), json_dumps(src["type"]))
% (src_name, json_dumps(src["type"]))
)
+ "\n"
+ SourceLine(sink, "type").makeError(
" with sink '%s' of type %s"
% (shortname(sink["id"]), json_dumps(sink["type"]))
" with sink '%s' of type %s" % (sink_name, json_dumps(sink["type"]))
)
)
if linkMerge is not None:
Expand All @@ -274,12 +279,12 @@ def static_checker(
msg = (
SourceLine(src, "type").makeError(
"Source '%s' of type %s is incompatible"
% (shortname(src["id"]), json_dumps(src["type"]))
% (shortname(cast(str, src["id"])), json_dumps(src["type"]))
)
+ "\n"
+ SourceLine(sink, "type").makeError(
" with sink '{}' of type {}".format(
shortname(sink["id"]), json_dumps(sink["type"])
shortname(cast(str, sink["id"])), json_dumps(sink["type"])
)
)
)
Expand All @@ -291,16 +296,17 @@ def static_checker(
exception_msgs.append(msg)

for sink in step_inputs:
sink_type = cast(Union[str, list[str], list[CWLObjectType], CWLObjectType], sink["type"])
if (
"null" != sink["type"]
and "null" not in sink["type"]
"null" != sink_type
and "null" not in sink_type
and "source" not in sink
and "default" not in sink
and "valueFrom" not in sink
):
msg = SourceLine(sink).makeError(
"Required parameter '%s' does not have source, default, or valueFrom expression"
% shortname(sink["id"])
% shortname(cast(str, sink["id"]))
)
exception_msgs.append(msg)

Expand All @@ -313,23 +319,29 @@ def static_checker(
raise ValidationException(all_exception_msg)


SrcSink = namedtuple("SrcSink", ["src", "sink", "linkMerge", "message"])
class _SrcSink(NamedTuple):
"""An error or warning message about a connection between two points of the workflow graph."""

src: CWLObjectType
sink: CWLObjectType
linkMerge: Optional[str]
message: Optional[str]


def check_all_types(
def _check_all_types(
src_dict: dict[str, CWLObjectType],
sinks: MutableSequence[CWLObjectType],
sourceField: Union[Literal["source"], Literal["outputSource"]],
param_to_step: dict[str, CWLObjectType],
) -> dict[str, list[SrcSink]]:
) -> dict[str, list[_SrcSink]]:
"""
Given a list of sinks, check if their types match with the types of their sources.

:raises WorkflowException: if there is an unrecognized linkMerge value
(from :py:func:`check_types`)
:raises ValidationException: if a sourceField is missing
"""
validation: dict[str, list[SrcSink]] = {"warning": [], "exception": []}
validation: dict[str, list[_SrcSink]] = {"warning": [], "exception": []}
for sink in sinks:
if sourceField in sink:
valueFrom = cast(Optional[str], sink.get("valueFrom"))
Expand All @@ -356,7 +368,7 @@ def check_all_types(
srcs_of_sink += [src_dict[parm_id]]
if is_conditional_step(param_to_step, parm_id) and pickValue is None:
validation["warning"].append(
SrcSink(
_SrcSink(
src_dict[parm_id],
sink,
linkMerge,
Expand All @@ -380,7 +392,7 @@ def check_all_types(

if pickValue is not None:
validation["warning"].append(
SrcSink(
_SrcSink(
src_dict[parm_id],
sink,
linkMerge,
Expand All @@ -399,7 +411,7 @@ def check_all_types(
Union[list[str], CWLObjectType], snk_typ
): # Given our type names this works even if not a list
validation["warning"].append(
SrcSink(
_SrcSink(
src_dict[parm_id],
sink,
linkMerge,
Expand All @@ -419,11 +431,11 @@ def check_all_types(
check_result = check_types(src, sink, linkMerge, valueFrom)
if check_result == "warning":
validation["warning"].append(
SrcSink(src, sink, linkMerge, message=extra_message)
_SrcSink(src, sink, linkMerge, message=extra_message)
)
elif check_result == "exception":
validation["exception"].append(
SrcSink(src, sink, linkMerge, message=extra_message)
_SrcSink(src, sink, linkMerge, message=extra_message)
)

return validation
Expand Down
2 changes: 1 addition & 1 deletion cwltool/command_line_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ def revmap_file(builder: Builder, outdir: str, f: CWLObjectType) -> Optional[CWL
)
revmap_f = builder.pathmapper.reversemap(path)

if revmap_f and not builder.pathmapper.mapper(revmap_f[0]).type.startswith("Writable"):
if revmap_f and not builder.pathmapper.mapper(revmap_f[0]).type.startswith("Writable"): # type: ignore[union-attr]
f["location"] = revmap_f[1]
elif (
uripath == outdir
Expand Down
2 changes: 1 addition & 1 deletion cwltool/cwlprov/ro.py
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,7 @@ def _relativise_files(
del structure["path"]

if structure.get("class") == "Directory":
# TODO: Generate anonymoys Directory with a "listing"
# TODO: Generate anonymous Directory with a "listing"
# pointing to the hashed files
del structure["location"]

Expand Down
7 changes: 7 additions & 0 deletions cwltool/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@
from cwl_utils.errors import GraphTargetMissingException as GraphTargetMissingException
from cwl_utils.errors import WorkflowException as WorkflowException

__all__ = (
"GraphTargetMissingException",
"WorkflowException",
"UnsupportedRequirement",
"ArgumentException",
)


class UnsupportedRequirement(WorkflowException):
pass
Expand Down
2 changes: 1 addition & 1 deletion cwltool/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import warnings
from codecs import getwriter
from collections.abc import Mapping, MutableMapping, MutableSequence, Sized
from importlib.resources import files
from typing import IO, Any, Callable, Optional, Union, cast

import argcomplete
Expand Down Expand Up @@ -96,7 +97,6 @@
CWLOutputType,
HasReqsHints,
adjustDirObjs,
files,
normalizeFilesDirs,
processes_to_kill,
trim_listing,
Expand Down
24 changes: 15 additions & 9 deletions cwltool/mutation.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
from collections import namedtuple
from typing import cast
"""Support for InplaceUpdateRequirement."""

from typing import NamedTuple, cast

from .errors import WorkflowException
from .utils import CWLObjectType

MutationState = namedtuple("MutationState", ["generation", "readers", "stepname"])

class _MutationState(NamedTuple):
generation: int
readers: list[str]
stepname: str


_generation = "http://commonwl.org/cwltool#generation"

Expand All @@ -20,11 +26,11 @@ class MutationManager:

def __init__(self) -> None:
"""Initialize."""
self.generations: dict[str, MutationState] = {}
self.generations: dict[str, _MutationState] = {}

def register_reader(self, stepname: str, obj: CWLObjectType) -> None:
loc = cast(str, obj["location"])
current = self.generations.get(loc, MutationState(0, [], ""))
current = self.generations.get(loc, _MutationState(0, [], ""))
obj_generation = obj.get(_generation, 0)

if obj_generation != current.generation:
Expand All @@ -40,7 +46,7 @@ def register_reader(self, stepname: str, obj: CWLObjectType) -> None:

def release_reader(self, stepname: str, obj: CWLObjectType) -> None:
loc = cast(str, obj["location"])
current = self.generations.get(loc, MutationState(0, [], ""))
current = self.generations.get(loc, _MutationState(0, [], ""))
obj_generation = obj.get(_generation, 0)

if obj_generation != current.generation:
Expand All @@ -55,7 +61,7 @@ def release_reader(self, stepname: str, obj: CWLObjectType) -> None:

def register_mutation(self, stepname: str, obj: CWLObjectType) -> None:
loc = cast(str, obj["location"])
current = self.generations.get(loc, MutationState(0, [], ""))
current = self.generations.get(loc, _MutationState(0, [], ""))
obj_generation = obj.get(_generation, 0)

if len(current.readers) > 0:
Expand All @@ -73,11 +79,11 @@ def register_mutation(self, stepname: str, obj: CWLObjectType) -> None:
)
)

self.generations[loc] = MutationState(current.generation + 1, current.readers, stepname)
self.generations[loc] = _MutationState(current.generation + 1, current.readers, stepname)

def set_generation(self, obj: CWLObjectType) -> None:
loc = cast(str, obj["location"])
current = self.generations.get(loc, MutationState(0, [], ""))
current = self.generations.get(loc, _MutationState(0, [], ""))
obj[_generation] = current.generation

def unset_generation(self, obj: CWLObjectType) -> None:
Expand Down
Loading
Loading