From 85c0fa265db8e11f8d0007e4396b5d721a5568b7 Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Mon, 11 Nov 2024 18:11:16 -0500 Subject: [PATCH 01/11] Revert "revert properties field modifiers (separte PR)" This reverts commit 1554874644025b072adc55326e09c8b3b471924d. --- .../builtin/properties_processor.cwl | 32 ++++ .../processes/builtin/properties_processor.py | 156 ++++++++++++++++++ weaver/processes/execution.py | 13 ++ 3 files changed, 201 insertions(+) create mode 100644 weaver/processes/builtin/properties_processor.cwl create mode 100644 weaver/processes/builtin/properties_processor.py diff --git a/weaver/processes/builtin/properties_processor.cwl b/weaver/processes/builtin/properties_processor.cwl new file mode 100644 index 000000000..30d7a715b --- /dev/null +++ b/weaver/processes/builtin/properties_processor.cwl @@ -0,0 +1,32 @@ +#! /usr/bin/env cwl-runner +cwlVersion: v1.0 +class: CommandLineTool +id: properties_processor +label: Properties Processor +doc: | + Generates properties contents using the specified input definitions. +# target the installed python pointing to weaver conda env to allow imports +baseCommand: ${WEAVER_ROOT_DIR}/bin/python +arguments: ["${WEAVER_ROOT_DIR}/weaver/processes/builtin/properties_processor.py", "-o", $(runtime.outdir)] +inputs: + properties: + doc: Properties definition submitted to the process and to be generated from input values. + type: File + format: "iana:application/json" + inputBinding: + prefix: -P + values: + doc: Values available for properties generation. + type: File + format: "iana:application/json" + inputBinding: + prefix: -V +outputs: + referenceOutput: + doc: Generated file contents from specified properties. + type: File + # note: important to omit 'format' here, since we want to preserve the flexibility to retrieve 'any' reference + outputBinding: + outputEval: $(runtime.outdir)/* +$namespaces: + iana: "https://www.iana.org/assignments/media-types/" diff --git a/weaver/processes/builtin/properties_processor.py b/weaver/processes/builtin/properties_processor.py new file mode 100644 index 000000000..71456f498 --- /dev/null +++ b/weaver/processes/builtin/properties_processor.py @@ -0,0 +1,156 @@ +#!/usr/bin/env python +""" +Generates properties contents using the specified input definitions. +""" +import argparse +import ast +import json +import logging +import os +import sys +import uuid +from typing import TYPE_CHECKING + +CUR_DIR = os.path.abspath(os.path.dirname(__file__)) +sys.path.insert(0, CUR_DIR) +# root to allow 'from weaver import <...>' +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(CUR_DIR)))) + +# place weaver specific imports after sys path fixing to ensure they are found from external call +# pylint: disable=C0413,wrong-import-order +from weaver.formats import ContentType, get_cwl_file_format # isort:skip # noqa: E402 +from weaver.processes.builtin.utils import get_package_details # isort:skip # noqa: E402) +from weaver.utils import Lazify, load_file, repr_json, request_extra # isort:skip # noqa: E402 + +if TYPE_CHECKING: + from typing import Dict + + from weaver.typedefs import ( + CWL_IO_ValueMap, + JSON, + Path, + ) + from weaver.utils import LoggerHandler + +PACKAGE_NAME, PACKAGE_BASE, PACKAGE_MODULE = get_package_details(__file__) + +# setup logger since it is not run from the main 'weaver' app +LOGGER = logging.getLogger(PACKAGE_MODULE) +LOGGER.addHandler(logging.StreamHandler(sys.stdout)) +LOGGER.setLevel(logging.INFO) + +# process details +__version__ = "1.0" +__title__ = "Properties Processor" +__abstract__ = __doc__ # NOTE: '__doc__' is fetched directly, this is mostly to be informative + +OUTPUT_CWL_JSON = "cwl.output.json" + + +def compute_property(property_name, calculation, properties): + # type: (str, str, Dict[str, JSON]) -> None + + ... # FIXME: ast to do eval safely - TBD: what about property pointing at file? + calc = calculation.lower() # handle 'Min()'->'min()' - names allowed by "well-known functions" + result = ast.literal_eval(calc) + properties.update({property_name: result}) + + +def process_properties(input_properties, input_values, output_dir, logger=LOGGER): + # type: (Dict[str, str], Dict[str, JSON], Path, LoggerHandler) -> JSON + """ + Processor of a ``properties`` definition for an input or output. + + :param input_properties: + Properties definition submitted to the process and to be generated from input values. + :param input_values: + Values available for properties generation. + :param output_dir: Directory to write the output (provided by the :term:`CWL` definition). + :param logger: Optional logger handler to employ. + :return: File reference containing the resolved properties. + """ + logger.log( # pylint: disable=E1205 # false positive + logging.INFO, + "Process [{}] Got arguments: input_properties={}, input_values={} output_dir=[{}]", + PACKAGE_NAME, + Lazify(lambda: repr_json(input_properties, indent=2)), + Lazify(lambda: repr_json(input_values, indent=2)), + output_dir, + ) + os.makedirs(output_dir, exist_ok=True) + + # sort properties later if they depend on other ones, the least dependencies to be computed first + props_deps = {prop: 0 for prop in input_properties} + for prop, calc in input_properties.items(): + for prop_dep in props_deps: + if prop == prop_dep: + if prop in calc: + raise ValueError(f"Invalid recursive property [{prop}] references itself.") + continue + if prop_dep in calc: + props_deps[prop_dep] += 1 + if not filter(lambda dep: dep[-1] == 0, props_deps.items()): + raise ValueError("Invalid properties all depend on another one. Impossible resolution order.") + props = sorted( + list(input_properties.items()), + key=lambda p: props_deps[p[0]] + ) + + # compute the properties + properties = {} + for prop, calc in props: + compute_property(prop, calc, properties) + + return properties + + +def process_cwl(input_properties, input_values, output_dir): + # type: (Dict[str, str], Dict[str, JSON], Path) -> CWL_IO_ValueMap + out_props = process_properties(input_properties, input_values, output_dir) + prop_file_path = os.path.join(output_dir, f"{uuid.uuid4()}.json") + with open(prop_file_path, mode="w", encoding="utf-8") as prop_file: + json.dump(out_props, prop_file, indent=2) + out_cwl_file = { + "class": "File", + "path": prop_file_path, + "format": get_cwl_file_format(ContentType.APP_JSON), + } + cwl_outputs = {"referenceOutput": out_cwl_file} # output ID must match the one used in CWL definition + cwl_out_path = os.path.join(output_dir, OUTPUT_CWL_JSON) + with open(cwl_out_path, mode="w", encoding="utf-8") as file: + json.dump(cwl_outputs, file) + return cwl_outputs + + +def main(*args): + # type: (*str) -> None + LOGGER.info("Process [%s] Parsing inputs...", PACKAGE_NAME) + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument( + "-P", "--properties", + metavar="INPUT_PROPERTIES", + required=True, + help="Properties definition submitted to the process and to be generated from input values.", + ) + parser.add_argument( + "-V", "--values", + metavar="INPUT_VALUES", + required=True, + help="Values available for properties generation.", + ) + parser.add_argument( + "-o", "--outdir", + metavar="OUTDIR", + required=True, + help="Output directory of the retrieved data.", + ) + ns = parser.parse_args(*args) + LOGGER.info("Process [%s] Loading properties input from file '%s'.", PACKAGE_NAME, ns.properties) + prop_in = load_file(ns.properties) + LOGGER.info("Process [%s] Loading values input from file '%s'.", PACKAGE_NAME, ns.values) + val_in = load_file(ns.values) + sys.exit(process_cwl(prop_in, val_in, ns.outdir) is not None) + + +if __name__ == "__main__": + main() diff --git a/weaver/processes/execution.py b/weaver/processes/execution.py index 2cd347cea..dc9a35746 100644 --- a/weaver/processes/execution.py +++ b/weaver/processes/execution.py @@ -38,6 +38,7 @@ from weaver.owsexceptions import OWSInvalidParameterValue, OWSNoApplicableCode from weaver.processes import wps_package from weaver.processes.builtin.collection_processor import process_collection +from weaver.processes.builtin.properties_processor import process_properties from weaver.processes.constants import WPS_BOUNDINGBOX_DATA, WPS_COMPLEX_DATA, JobInputsOutputsSchema from weaver.processes.convert import ( convert_input_values_schema, @@ -669,6 +670,18 @@ def parse_wps_inputs(wps_process, job, container=None): else: resolved_input_values = [(input_value, input_info)] + # post-handling of properties + properties = input_value.get("properties") if isinstance(input_value, dict) else None + if properties: + input_prop_path = os.path.join(job.tmpdir, "inputs", input_id) + # FIXME: handle other cross-input refs? + # (ie: parametrized I/O in https://docs.ogc.org/DRAFTS/21-009.html#section_deployable_workflows) + input_prop_values = {input_id: resolved_input_values} + resolved_input_values = process_properties( + properties, + input_prop_values, + input_prop_path, + ) resolved_inputs.extend(resolved_input_values) for input_value, input_info in resolved_inputs: From c85662581dc1a79ea6b92f0bd9bda1a4ce3d86bf Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Tue, 12 Nov 2024 16:08:08 -0500 Subject: [PATCH 02/11] [wip] add properties field modifiers parsing --- tests/functional/test_wps_package.py | 174 +++++++++++++++++- tests/functional/utils.py | 44 ++--- .../processes/builtin/collection_processor.py | 30 ++- .../builtin/field_modifier_processor.cwl | 54 ++++++ ...ocessor.py => field_modifier_processor.py} | 129 ++++++++++--- .../builtin/properties_processor.cwl | 32 ---- weaver/processes/execution.py | 25 ++- weaver/typedefs.py | 39 +++- weaver/wps_restapi/swagger_definitions.py | 75 +++++++- 9 files changed, 477 insertions(+), 125 deletions(-) create mode 100644 weaver/processes/builtin/field_modifier_processor.cwl rename weaver/processes/builtin/{properties_processor.py => field_modifier_processor.py} (52%) delete mode 100644 weaver/processes/builtin/properties_processor.cwl diff --git a/tests/functional/test_wps_package.py b/tests/functional/test_wps_package.py index 9a34fc448..e095af3a3 100644 --- a/tests/functional/test_wps_package.py +++ b/tests/functional/test_wps_package.py @@ -79,7 +79,7 @@ from weaver.wps_restapi import swagger_definitions as sd if TYPE_CHECKING: - from typing import List + from typing import Dict, List from responses import RequestsMock @@ -1185,6 +1185,7 @@ def test_deploy_block_builtin_processes_from_api(self): process = self.process_store.fetch_by_id(self._testMethodName) assert process.type == ProcessType.APPLICATION + @pytest.mark.oap_part2 @parameterized.expand([ # not allowed even if combined with another known and valid definition ({"UnknownRequirement": {}, CWL_REQUIREMENT_APP_DOCKER: {"dockerPull": "python:3.7-alpine"}}, ), @@ -1239,6 +1240,7 @@ def test_deploy_requirement_inline_javascript(self): _, cwl = self.deploy_process(body, describe_schema=ProcessSchema.OGC) assert CWL_REQUIREMENT_INLINE_JAVASCRIPT in cwl["requirements"] + @pytest.mark.oap_part2 def test_deploy_merge_complex_io_with_multiple_formats_and_defaults(self): """ Test validates that different format types are set on different input variations simultaneously: @@ -1586,6 +1588,7 @@ def test_deploy_merge_complex_io_with_multiple_formats_and_defaults(self): assert pkg["outputs"][3]["type"] == {"type": "array", "items": "File"} assert "format" not in pkg["outputs"][3], "CWL format array not allowed for outputs." + @pytest.mark.oap_part2 def test_deploy_merge_resolution_io_min_max_occurs(self): """ Test validates that various merging/resolution strategies of I/O definitions are properly applied for @@ -1755,6 +1758,7 @@ def test_deploy_merge_resolution_io_min_max_occurs(self): assert pkg["inputs"][13]["id"] == "optional_array_max_fixed_by_wps" # assert pkg["inputs"][13]["type"] == "string[]?" + @pytest.mark.oap_part2 def test_deploy_merge_valid_io_min_max_occurs_as_str_or_int(self): """ Test validates that I/O definitions with ``minOccurs`` and/or ``maxOccurs`` are permitted as both integer and @@ -1935,6 +1939,7 @@ def test_execute_job_with_accept_languages(self): "Expected error description to indicate bad language" ) + @pytest.mark.oap_part1 @mocked_aws_config @mocked_aws_s3 @mocked_http_file @@ -2111,6 +2116,7 @@ def tmp(value): assert processed_values["test_reference_http_value"] == "THIS IS A GENERATED FILE FOR HTTP TEST" assert processed_values["test_reference_file_value"] == "THIS IS A GENERATED FILE FOR FILE TEST" + @pytest.mark.oap_part1 def test_execute_job_with_inline_input_values(self): """ Validates that the job can receive an object and array types inputs and process them as expected. @@ -2246,6 +2252,7 @@ def test_execute_job_with_inline_input_values(self): assert processed_values["measureFloatInput"] == 10.2 assert processed_values["measureFileInput"] == {"VALUE": {"REF": 1, "MEASUREMENT": 10.3, "UOM": "M"}} + @pytest.mark.oap_part1 def test_execute_job_with_bbox(self): body = self.retrieve_payload("EchoBoundingBox", "deploy", local=True) proc = self.fully_qualified_test_name(self._testMethodName) @@ -2281,7 +2288,13 @@ def test_execute_job_with_bbox(self): "Expected the BBOX CRS URI to be interpreted and validated by known WPS definitions." ) + @pytest.mark.oap_part3 def test_execute_job_with_collection_input_geojson_feature_collection(self): + """ + Validate parsing and handling of ``collection`` specified in an input reference to a :term:`GeoJSON` file. + + .. versionadded:: 5.8 + """ name = "EchoFeatures" body = self.retrieve_payload(name, "deploy", local=True) proc = self.fully_qualified_test_name(self._testMethodName) @@ -2331,12 +2344,18 @@ def test_execute_job_with_collection_input_geojson_feature_collection(self): out_data = json.load(out_fd) assert out_data["features"] == col_feats["features"] + @pytest.mark.oap_part3 @parameterized.expand([ # note: the following are not *actually* filtering, but just validating formats are respected across code paths ("POST", "cql2-json", {"op": "=", "args": [{"property": "name"}, "test"]}), ("GET", "cql2-text", "property.name = 'test'"), ]) def test_execute_job_with_collection_input_ogc_features(self, filter_method, filter_lang, filter_value): + """ + Validate parsing and handling of ``collection`` specified in an input with an `OGC API - Features` reference. + + .. versionadded:: 5.8 + """ name = "EchoFeatures" body = self.retrieve_payload(name, "deploy", local=True) proc = self.fully_qualified_test_name(self._testMethodName) @@ -2394,6 +2413,159 @@ def test_execute_job_with_collection_input_ogc_features(self, filter_method, fil out_data = json.load(out_fd) assert out_data["features"] == col_feats["features"] + @pytest.mark.oap_part3 + def test_execute_job_with_collection_input_stac(self): + """ + Validate parsing and handling of ``collection`` specified in an input with :term:`STAC` :term:`API` endpoint. + + Also, evaluate the dynamic insertion of a ``properties`` definition. + + .. versionadded:: 6.0 + """ + name = "EchoFeatures" + body = self.retrieve_payload(name, "deploy", local=True) + proc = self.fully_qualified_test_name(self._testMethodName) + self.deploy_process(body, describe_schema=ProcessSchema.OGC, process_id=proc) + + with contextlib.ExitStack() as stack: + tmp_host = "https://mocked-file-server.com" # must match collection prefix hostnames + tmp_dir = stack.enter_context(tempfile.TemporaryDirectory()) # pylint: disable=R1732 + tmp_svr = stack.enter_context( + mocked_file_server(tmp_dir, tmp_host, settings=self.settings, mock_browse_index=True) + ) + exec_body_val = self.retrieve_payload(name, "execute", local=True) + col_feats = exec_body_val["inputs"]["features"]["value"] # type: JSON + filter_lang = "cql2-json" + filter_value = {"op": "=", "args": [{"property": "name"}, "test"]} + field_props = {"variable": "1 + 1"} # not in search request, separate post-operation + search_body = { + "filter": filter_value, + "filter-lang": filter_lang, + } + search_match = responses.matchers.json_params_matcher(search_body) + tmp_svr.add("POST", f"{tmp_host}/search", json=col_feats, match=[search_match]) + + col_exec_body = { + "mode": ExecuteMode.ASYNC, + "response": ExecuteResponse.DOCUMENT, + "inputs": { + "features": { + "collection": f"{tmp_host}/collections/test", + "format": ExecuteCollectionFormat.STAC, + "type": ContentType.APP_GEOJSON, + "filter-lang": filter_lang, + "filter": filter_value, + "properties": field_props, + } + } + } + + for mock_exec in mocked_execute_celery(): + stack.enter_context(mock_exec) + proc_url = f"/processes/{proc}/execution" + resp = mocked_sub_requests(self.app, "post_json", proc_url, timeout=5, + data=col_exec_body, headers=self.json_headers, only_local=True) + assert resp.status_code in [200, 201], f"Failed with: [{resp.status_code}]\nReason:\n{resp.json}" + + status_url = resp.json["location"] + results = self.monitor_job(status_url) + assert "features" in results + + job_id = status_url.rsplit("/", 1)[-1] + wps_dir = get_wps_output_dir(self.settings) + job_dir = os.path.join(wps_dir, job_id) + job_out = os.path.join(job_dir, "features", "features.geojson") + assert os.path.isfile(job_out), f"Invalid output file not found: [{job_out}]" + with open(job_out, mode="r", encoding="utf-8") as out_fd: + out_data = json.load(out_fd) + + assert "features" in out_data and isinstance(out_data["features"], list) + assert all("properties" in feat for feat in out_data["features"]) + out_prop = {"name": "test"} + out_prop.update(field_props) + for feat_src, feat_out in zip(col_feats["features"], out_data["features"]): + assert feat_src["properties"] == {"name": "test"} + assert out_data["properties"] == out_prop + + @pytest.mark.oap_part3 + def test_execute_job_with_nested_process_properties_field_modifier(self): + """ + Validate parsing and handling of ``properties`` specified for a nested process execution. + + .. versionadded:: 6.0 + """ + name = "EchoFeatures" + body = self.retrieve_payload(name, "deploy", local=True) + proc = self.fully_qualified_test_name(self._testMethodName) + self.deploy_process(body, describe_schema=ProcessSchema.OGC, process_id=proc) + + with contextlib.ExitStack() as stack: + tmp_host = "https://mocked-file-server.com" # must match collection prefix hostnames + tmp_dir = stack.enter_context(tempfile.TemporaryDirectory()) # pylint: disable=R1732 + tmp_svr = stack.enter_context( + mocked_file_server(tmp_dir, tmp_host, settings=self.settings, mock_browse_index=True) + ) + + exec_body_val = self.retrieve_payload(name, "execute", local=True) + col_file = os.path.join(tmp_dir, "test.json") + col_feats = exec_body_val["inputs"]["features"]["value"] # type: JSON + with open(col_file, mode="w", encoding="utf-8") as tmp_feature_collection_geojson: + json.dump(col_feats, tmp_feature_collection_geojson) + + filter_value = { + "filter": "property.name = 'test'" + } + filter_match = responses.matchers.json_params_matcher(filter_value) + tmp_svr.add("POST", f"{tmp_host}/collections/test/items", json=col_feats, match=[filter_match]) + + proc_url = f"/processes/{proc}" + exec_url = f"{proc_url}/execution" + col_exec_body = { + "mode": ExecuteMode.ASYNC, + "response": ExecuteResponse.DOCUMENT, + "process": proc_url, + "inputs": { + "features": { + "process": proc_url, + "inputs": { + "features": { + # accessed directly as a static GeoJSON FeatureCollection + "collection": "https://mocked-file-server.com/test.json", + "format": ExecuteCollectionFormat.GEOJSON, + "schema": "http://www.opengis.net/def/glossary/term/FeatureCollection", + "filter": "property.name = 'test'", + "properties": { + "nested": "1 / 2" + } + } + }, + "properties": { + "extra": 3.1416 + } + } + } + } + + for mock_exec in mocked_execute_celery(): + stack.enter_context(mock_exec) + resp = mocked_sub_requests(self.app, "post_json", exec_url, timeout=5, + data=col_exec_body, headers=self.json_headers, only_local=True) + assert resp.status_code in [200, 201], f"Failed with: [{resp.status_code}]\nReason:\n{resp.json}" + + status_url = resp.json["location"] + results = self.monitor_job(status_url) + assert "features" in results + + job_id = status_url.rsplit("/", 1)[-1] + wps_dir = get_wps_output_dir(self.settings) + job_dir = os.path.join(wps_dir, job_id) + job_out = os.path.join(job_dir, "features", "features.geojson") + assert os.path.isfile(job_out), f"Invalid output file not found: [{job_out}]" + with open(job_out, mode="r", encoding="utf-8") as out_fd: + out_data = json.load(out_fd) + + assert out_data["features"] == col_feats["features"] + def test_execute_job_with_context_output_dir(self): cwl = { "cwlVersion": "v1.0", diff --git a/tests/functional/utils.py b/tests/functional/utils.py index b995d919e..6a372fc24 100644 --- a/tests/functional/utils.py +++ b/tests/functional/utils.py @@ -92,11 +92,8 @@ def request(cls, method, url, *args, **kwargs): def retrieve_payload(cls, process, # type: str ref_type=None, # type: Literal["deploy"] - ref_name=None, # type: Optional[str] - ref_found=False, # type: Literal[False] - location=None, # type: Optional[str] - local=False, # type: bool - ): # type: (...) -> Union[ProcessDeployment, Dict[str, JSON]] + **kwargs, # type: Any + ): # type: (...) -> ProcessDeployment ... @classmethod @@ -104,11 +101,8 @@ def retrieve_payload(cls, def retrieve_payload(cls, process, # type: str ref_type=None, # type: Literal["describe"] - ref_name=None, # type: Optional[str] - ref_found=False, # type: Literal[False] - location=None, # type: Optional[str] - local=False, # type: bool - ): # type: (...) -> Union[ProcessDescription, Dict[str, JSON]] + **kwargs, # type: Any + ): # type: (...) -> ProcessDescription ... @classmethod @@ -116,11 +110,8 @@ def retrieve_payload(cls, def retrieve_payload(cls, process, # type: str ref_type=None, # type: Literal["execute", "quotation"] - ref_name=None, # type: Optional[str] - ref_found=False, # type: Literal[False] - location=None, # type: Optional[str] - local=False, # type: bool - ): # type: (...) -> Union[ProcessExecution, Dict[str, JSON]] + **kwargs, # type: Any + ): # type: (...) -> ProcessExecution ... @classmethod @@ -128,10 +119,7 @@ def retrieve_payload(cls, def retrieve_payload(cls, process, # type: str ref_type=None, # type: Literal["package"] - ref_name=None, # type: Optional[str] - ref_found=False, # type: Literal[False] - location=None, # type: Optional[str] - local=False, # type: bool + **kwargs, # type: Any ): # type: (...) -> CWL ... @@ -140,10 +128,7 @@ def retrieve_payload(cls, def retrieve_payload(cls, process, # type: str ref_type=None, # type: Literal["estimator"] - ref_name=None, # type: Optional[str] - ref_found=False, # type: Literal[False] - location=None, # type: Optional[str] - local=False, # type: bool + **kwargs, # type: Any ): # type: (...) -> Dict[str, JSON] ... @@ -152,10 +137,7 @@ def retrieve_payload(cls, def retrieve_payload(cls, process, # type: str ref_type=None, # type: ReferenceType - ref_name=None, # type: Optional[str] - ref_found=False, # type: Literal[True] - location=None, # type: Optional[str] - local=False, # type: bool + **kwargs, # type: Any ): # type: (...) -> str ... @@ -378,10 +360,8 @@ def describe_process(cls, process_id, describe_schema=ProcessSchema.OGC): @overload def deploy_process(cls, payload, # type: JSON - process_id=None, # type: Optional[str] describe_schema=ProcessSchema.OGC, # type: ProcessSchemaOGCType - mock_requests_only_local=True, # type: bool - add_package_requirement=True, # type: bool + **kwargs, # type: Any ): # type: (...) -> Tuple[ProcessDescriptionMapping, CWL] ... @@ -389,10 +369,8 @@ def deploy_process(cls, @overload def deploy_process(cls, payload, # type: JSON - process_id=None, # type: Optional[str] describe_schema=ProcessSchema.OGC, # type: ProcessSchemaOLDType - mock_requests_only_local=True, # type: bool - add_package_requirement=True, # type: bool + **kwargs, # type: Any ): # type: (...) -> Tuple[ProcessDescriptionListing, CWL] ... diff --git a/weaver/processes/builtin/collection_processor.py b/weaver/processes/builtin/collection_processor.py index 3fe2d3d8b..73daea319 100644 --- a/weaver/processes/builtin/collection_processor.py +++ b/weaver/processes/builtin/collection_processor.py @@ -31,6 +31,7 @@ get_cwl_file_format, get_extension ) +from weaver.processes.builtin.field_modifier_processor import process_field_modifiers # isort:skip # noqa: E402 from weaver.processes.builtin.utils import ( # isort:skip # noqa: E402 get_package_details, is_geojson_url, @@ -151,6 +152,16 @@ def process_collection(collection_input, input_definition, output_dir, logger=LO if not (col_resp.status_code == 200 and "features" in col_json): raise ValueError(f"Could not parse [{col_href}] as a GeoJSON FeatureCollection.") + col_json = process_field_modifiers( + col_args.get("filter"), + col_args.get("filter-crs"), + col_args.get("filter-lang"), + col_args.get("properties"), + col_args.get("sortBy"), + col_json, + output_dir, + ) + for i, feat in enumerate(col_json["features"]): path = os.path.join(output_dir, f"feature-{i}.geojson") with open(path, mode="w", encoding="utf-8") as file: @@ -165,16 +176,29 @@ def process_collection(collection_input, input_definition, output_dir, logger=LO if "-" in arg: col_args[arg.replace("-", "_")] = col_args.pop(arg) known_params = set(inspect.signature(ItemSearch).parameters) - known_params -= {"url", "method", "stac_io", "client", "collection", "ids", "modifier"} - for param in set(col_args) - known_params: + known_params -= {"url", "method", "stac_io", "client", "collection", "ids", "properties"} + unknown_params = set(col_args) - known_params + for param in unknown_params: col_args.pop(param) + # STAC client can be-process filters and sorting server-side + # only perform the remaining properties modifier operations locally + col_properties = col_args.pop("properties", None) + col_field_modifier = None + if col_properties: + col_field_modifier = lambda obj: process_field_modifiers( # noqa: E731 # pylint: disable=C3001 + obj.dict(), + output_dir, + properties=col_properties, + ) + timeout = col_args.pop("timeout", 10) search = ItemSearch( - url=api_url, + url=f"{api_url}/search", method="POST", stac_io=StacApiIO(timeout=timeout, max_retries=3), # FIXME: add 'headers' with authorization/cookies? collections=col_id, + modifier=col_field_modifier, **col_args ) for item in search.items(): diff --git a/weaver/processes/builtin/field_modifier_processor.cwl b/weaver/processes/builtin/field_modifier_processor.cwl new file mode 100644 index 000000000..9e438e751 --- /dev/null +++ b/weaver/processes/builtin/field_modifier_processor.cwl @@ -0,0 +1,54 @@ +#! /usr/bin/env cwl-runner +cwlVersion: v1.0 +class: CommandLineTool +id: field_modifier_processor +label: Field Modifier Processor +doc: | + Performs field modification over properties and contents using the specified input definitions. +# target the installed python pointing to weaver conda env to allow imports +baseCommand: ${WEAVER_ROOT_DIR}/bin/python +arguments: ["${WEAVER_ROOT_DIR}/weaver/processes/builtin/properties_processor.py", "-o", $(runtime.outdir)] +inputs: + properties: + doc: Properties definition submitted to the process and to be generated from input values. + type: File + format: "iana:application/json" + inputBinding: + prefix: --properties + filter: + doc: Filter definition submitted to the process and to be generated from input values. + type: File + format: "iana:application/json" + inputBinding: + prefix: --filter + filter-crs: + doc: Filter Coordinate Reference System (CRS) to employ with the 'filter' parameter. + type: string + default: "EPSG:4326" + inputBinding: + prefix: --filter-crs + filter-lang: + doc: Filter language to interpret the 'filter' parameter. + type: string? + inputBinding: + prefix: --filter-lang + sortBy: + doc: Sorting definition with relevant properties and ordering direction. + type: string? + inputBinding: + prefix: --sortby + values: + doc: Values available for content field modification. + type: File + format: "iana:application/json" + inputBinding: + prefix: -V +outputs: + referenceOutput: + doc: Generated file contents from specified field modifiers. + type: File + # note: important to omit 'format' here, since we want to preserve the flexibility to retrieve 'any' reference + outputBinding: + outputEval: $(runtime.outdir)/* +$namespaces: + iana: "https://www.iana.org/assignments/media-types/" diff --git a/weaver/processes/builtin/properties_processor.py b/weaver/processes/builtin/field_modifier_processor.py similarity index 52% rename from weaver/processes/builtin/properties_processor.py rename to weaver/processes/builtin/field_modifier_processor.py index 71456f498..d0d48eb6d 100644 --- a/weaver/processes/builtin/properties_processor.py +++ b/weaver/processes/builtin/field_modifier_processor.py @@ -27,6 +27,11 @@ from weaver.typedefs import ( CWL_IO_ValueMap, + FieldModifierFilter, + FieldModifierFilterCRS, + FieldModifierFilterLang, + FieldModifierProperties, + FieldModifierSortBy, JSON, Path, ) @@ -41,7 +46,7 @@ # process details __version__ = "1.0" -__title__ = "Properties Processor" +__title__ = "Field Modifier Processor" __abstract__ = __doc__ # NOTE: '__doc__' is fetched directly, this is mostly to be informative OUTPUT_CWL_JSON = "cwl.output.json" @@ -56,32 +61,54 @@ def compute_property(property_name, calculation, properties): properties.update({property_name: result}) -def process_properties(input_properties, input_values, output_dir, logger=LOGGER): - # type: (Dict[str, str], Dict[str, JSON], Path, LoggerHandler) -> JSON +def process_field_modifiers( + values, # type: Dict[str, JSON] + output_dir, # type: Path + *, # force named keyword arguments after + filter=None, # type: FieldModifierFilter, + filter_crs=None, # type: FieldModifierFilterCRS, + filter_lang=None, # type: FieldModifierFilterLang, + properties=None, # type: FieldModifierProperties, + sortby=None, # type: FieldModifierSortBy, + logger=LOGGER, # type: LoggerHandler +): # type: (...) -> JSON """ Processor of a ``properties`` definition for an input or output. - :param input_properties: - Properties definition submitted to the process and to be generated from input values. - :param input_values: - Values available for properties generation. + :param values: + Values available for properties modification. :param output_dir: Directory to write the output (provided by the :term:`CWL` definition). + :param properties: + Properties definition submitted to the process and to be generated from input values. :param logger: Optional logger handler to employ. :return: File reference containing the resolved properties. """ logger.log( # pylint: disable=E1205 # false positive logging.INFO, - "Process [{}] Got arguments: input_properties={}, input_values={} output_dir=[{}]", + ( + "Process [{}] Got arguments:\n" + " filter={}\n" + " filter_crs={}\n" + " filter_lang={}\n" + " properties={}\n" + " sortby={}\n" + " values={}\n" + " output_dir=[{}]" + ), PACKAGE_NAME, - Lazify(lambda: repr_json(input_properties, indent=2)), - Lazify(lambda: repr_json(input_values, indent=2)), + Lazify(lambda: repr_json(filter, indent=2)), + Lazify(lambda: repr_json(filter_crs, indent=2)), + Lazify(lambda: repr_json(filter_lang, indent=2)), + Lazify(lambda: repr_json(properties, indent=2)), + Lazify(lambda: repr_json(sortby, indent=2)), + Lazify(lambda: repr_json(values, indent=2)), output_dir, ) os.makedirs(output_dir, exist_ok=True) # sort properties later if they depend on other ones, the least dependencies to be computed first - props_deps = {prop: 0 for prop in input_properties} - for prop, calc in input_properties.items(): + props_deps = {prop: 0 for prop in properties} + for prop, calc in properties.items(): for prop_dep in props_deps: if prop == prop_dep: if prop in calc: @@ -92,7 +119,7 @@ def process_properties(input_properties, input_values, output_dir, logger=LOGGER if not filter(lambda dep: dep[-1] == 0, props_deps.items()): raise ValueError("Invalid properties all depend on another one. Impossible resolution order.") props = sorted( - list(input_properties.items()), + list(properties.items()), key=lambda p: props_deps[p[0]] ) @@ -104,21 +131,36 @@ def process_properties(input_properties, input_values, output_dir, logger=LOGGER return properties -def process_cwl(input_properties, input_values, output_dir): - # type: (Dict[str, str], Dict[str, JSON], Path) -> CWL_IO_ValueMap - out_props = process_properties(input_properties, input_values, output_dir) - prop_file_path = os.path.join(output_dir, f"{uuid.uuid4()}.json") - with open(prop_file_path, mode="w", encoding="utf-8") as prop_file: - json.dump(out_props, prop_file, indent=2) +def process_cwl( + input_filter, # type: FieldModifierFilter, + input_filter_crs, # type: FieldModifierFilterCRS, + input_filter_lang, # type: FieldModifierFilterLang, + input_properties, # type: FieldModifierProperties, + input_sortby, # type: FieldModifierSortBy, + input_values, # type: Dict[str, JSON] + output_dir, # type: Path +): # type: (...) -> CWL_IO_ValueMap + result = process_field_modifiers( + values=input_values, + output_dir=output_dir, + filter=input_filter, + filter_crs=input_filter_crs, + filter_lang=input_filter_lang, + properties=input_properties, + sortby=input_sortby, + ) + file_path = os.path.join(output_dir, f"{uuid.uuid4()}.json") + with open(file_path, mode="w", encoding="utf-8") as mod_file: + json.dump(result, mod_file, indent=2) out_cwl_file = { "class": "File", - "path": prop_file_path, - "format": get_cwl_file_format(ContentType.APP_JSON), + "path": file_path, + "format": get_cwl_file_format(ContentType.APP_JSON, make_reference=True), } cwl_outputs = {"referenceOutput": out_cwl_file} # output ID must match the one used in CWL definition cwl_out_path = os.path.join(output_dir, OUTPUT_CWL_JSON) - with open(cwl_out_path, mode="w", encoding="utf-8") as file: - json.dump(cwl_outputs, file) + with open(cwl_out_path, mode="w", encoding="utf-8") as cwl_out_file: + json.dump(cwl_outputs, cwl_out_file) return cwl_outputs @@ -126,30 +168,63 @@ def main(*args): # type: (*str) -> None LOGGER.info("Process [%s] Parsing inputs...", PACKAGE_NAME) parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument( + "-F", "--filter", + dest="input_filter", + metavar="INPUT_FILTER", + required=False, + help="Filter definition submitted to the process and to be generated from input values.", + ) + parser.add_argument( + "--filter-crs", + dest="input_filter_crs", + metavar="INPUT_FILTER_CRS", + required=False, + help="Filter Coordinate Reference System (CRS) to employ with the 'filter' parameter.", + ) + parser.add_argument( + "--filter-lang", + dest="input_filter_lang", + metavar="INPUT_FILTER_LANG", + required=False, + help="Filter language to interpret the 'filter' parameter.", + ) parser.add_argument( "-P", "--properties", + dest="input_properties", metavar="INPUT_PROPERTIES", - required=True, + required=False, help="Properties definition submitted to the process and to be generated from input values.", ) + parser.add_argument( + "-S", "--sortby", "--sortBy", "--sort-by", + dest="input_sortby", + metavar="INPUT_SORTBY", + required=False, + help="Sorting definition with relevant properties and ordering direction.", + ) parser.add_argument( "-V", "--values", + dest="input_values", metavar="INPUT_VALUES", required=True, help="Values available for properties generation.", ) parser.add_argument( "-o", "--outdir", + dest="output_dir", metavar="OUTDIR", required=True, help="Output directory of the retrieved data.", ) ns = parser.parse_args(*args) LOGGER.info("Process [%s] Loading properties input from file '%s'.", PACKAGE_NAME, ns.properties) - prop_in = load_file(ns.properties) + prop_in = load_file(ns.input_properties) LOGGER.info("Process [%s] Loading values input from file '%s'.", PACKAGE_NAME, ns.values) - val_in = load_file(ns.values) - sys.exit(process_cwl(prop_in, val_in, ns.outdir) is not None) + val_in = load_file(ns.input_values) + params = dict(**vars(ns)) + params.update({"input_properties": prop_in, "input_values": val_in}) + sys.exit(process_cwl(**params) is not None) if __name__ == "__main__": diff --git a/weaver/processes/builtin/properties_processor.cwl b/weaver/processes/builtin/properties_processor.cwl deleted file mode 100644 index 30d7a715b..000000000 --- a/weaver/processes/builtin/properties_processor.cwl +++ /dev/null @@ -1,32 +0,0 @@ -#! /usr/bin/env cwl-runner -cwlVersion: v1.0 -class: CommandLineTool -id: properties_processor -label: Properties Processor -doc: | - Generates properties contents using the specified input definitions. -# target the installed python pointing to weaver conda env to allow imports -baseCommand: ${WEAVER_ROOT_DIR}/bin/python -arguments: ["${WEAVER_ROOT_DIR}/weaver/processes/builtin/properties_processor.py", "-o", $(runtime.outdir)] -inputs: - properties: - doc: Properties definition submitted to the process and to be generated from input values. - type: File - format: "iana:application/json" - inputBinding: - prefix: -P - values: - doc: Values available for properties generation. - type: File - format: "iana:application/json" - inputBinding: - prefix: -V -outputs: - referenceOutput: - doc: Generated file contents from specified properties. - type: File - # note: important to omit 'format' here, since we want to preserve the flexibility to retrieve 'any' reference - outputBinding: - outputEval: $(runtime.outdir)/* -$namespaces: - iana: "https://www.iana.org/assignments/media-types/" diff --git a/weaver/processes/execution.py b/weaver/processes/execution.py index dc9a35746..6b23d8812 100644 --- a/weaver/processes/execution.py +++ b/weaver/processes/execution.py @@ -38,7 +38,7 @@ from weaver.owsexceptions import OWSInvalidParameterValue, OWSNoApplicableCode from weaver.processes import wps_package from weaver.processes.builtin.collection_processor import process_collection -from weaver.processes.builtin.properties_processor import process_properties +from weaver.processes.builtin.field_modifier_processor import process_field_modifiers from weaver.processes.constants import WPS_BOUNDINGBOX_DATA, WPS_COMPLEX_DATA, JobInputsOutputsSchema from weaver.processes.convert import ( convert_input_values_schema, @@ -664,24 +664,21 @@ def parse_wps_inputs(wps_process, job, container=None): f"Abort execution. Cannot map multiple outputs from {proc_uri} " f"to input [{input_id}] of [{job.process}]." ) - resolved_input_values = [(results[0], input_info)] + results = results[0] + field_modifier_props = input_value.get("properties") + if field_modifier_props: + results = process_field_modifiers( + results, + out_dir, + properties=field_modifier_props, + ) + + resolved_input_values = [(results, input_info)] # typical file/data else: resolved_input_values = [(input_value, input_info)] - # post-handling of properties - properties = input_value.get("properties") if isinstance(input_value, dict) else None - if properties: - input_prop_path = os.path.join(job.tmpdir, "inputs", input_id) - # FIXME: handle other cross-input refs? - # (ie: parametrized I/O in https://docs.ogc.org/DRAFTS/21-009.html#section_deployable_workflows) - input_prop_values = {input_id: resolved_input_values} - resolved_input_values = process_properties( - properties, - input_prop_values, - input_prop_path, - ) resolved_inputs.extend(resolved_input_values) for input_value, input_info in resolved_inputs: diff --git a/weaver/typedefs.py b/weaver/typedefs.py index 09709b4d1..4b49a99f4 100644 --- a/weaver/typedefs.py +++ b/weaver/typedefs.py @@ -430,6 +430,30 @@ class CWL_SchemaName(Protocol): DataSource = Union[DataSourceFileRef, DataSourceOpenSearch] DataSourceConfig = Dict[str, DataSource] # JSON/YAML file contents + FieldModifierFilter = Union[ + str, + JSON, # CQL or other + ] + FieldModifierFilterCRS = str + FieldModifierFilterLang = str + FieldModifierProperties = Dict[str, str] + FieldModifierSortByItem = TypedDict("FieldModifierSortByItem", { + "field": str, + "direction": Literal["asc", "desc"], + }, total=True) + FieldModifierSortBy = Union[ + str, + List[str], + List[FieldModifierSortByItem], + ] + FiledModifiers = TypedDict("FiledModifiers", { + "filter": Optional[FieldModifierFilter], + "filter-crs": Optional[FieldModifierFilterCRS], + "filter-lang": Optional[FieldModifierFilterLang], + "properties": Optional[FieldModifierProperties], + "sortBy": Optional[FieldModifierSortBy], + }, total=False) + JobValueBbox = TypedDict("JobValueBbox", { "bbox": Required[List[Number]], "crs": NotRequired[str], @@ -452,12 +476,17 @@ class CWL_SchemaName(Protocol): }, total=False) JobValueCollection = TypedDict("JobValueCollection", { "collection": Required[str], - "filter": Optional[JSON], - "filter-crs": Optional[str], - "filter-lang": Optional[str], - "sortBy": Optional[str], # FIXME: JSON? (https://github.com/opengeospatial/ogcapi-processes/issues/429) + "filter": Optional[FieldModifierFilter], + "filter-crs": Optional[FieldModifierFilterCRS], + "filter-lang": Optional[FieldModifierFilterLang], + "properties": Optional[FieldModifierProperties], + "sortBy": Optional[FieldModifierSortBy], }, total=False) - JobValueNestedProcess = "ProcessExecution" # type: TypeAlias + JobValueNestedProcessOnly = "ProcessExecution" # type: TypeAlias + JobValueNestedProcess = Union[ + JobValueNestedProcessOnly, + FiledModifiers, + ] JobValueData = TypedDict("JobValueData", { "data": Required[AnyValueType], }, total=False) diff --git a/weaver/wps_restapi/swagger_definitions.py b/weaver/wps_restapi/swagger_definitions.py index f3e8f4670..421cdce41 100644 --- a/weaver/wps_restapi/swagger_definitions.py +++ b/weaver/wps_restapi/swagger_definitions.py @@ -1608,6 +1608,18 @@ def deserialize(self, cstruct): return result +class PropertiesFieldModifierExpression(ExtendedMappingSchema): + prop = AnyFilterExpression( + variable="{property}", + description="Expression that defines how to compute the property.", + validator=Length(min=1), + ) + + +class PropertiesFieldModifierSchema(ExtendedMappingSchema): + properties = PropertiesFieldModifierExpression(missing=drop) + + class SortByExpression(ExpandStringList, ExtendedSchemaNode): schema_type = String default = None @@ -1619,9 +1631,38 @@ class SortByExpression(ExpandStringList, ExtendedSchemaNode): ) +class SortByDirection(ExtendedSchemaNode): + schema_type = String + validator = OneOf(["asc", "desc"]) + + +class SortByObject(StrictMappingSchema): + field = ExtendedSchemaNode(String()) + direction = SortByDirection() + + +class SortByItem(OneOfKeywordSchema): + _one_of = [ + SortByExpression(), + SortByObject(), + ] + + +class SortByList(ExtendedSequenceSchema): + item = SortByItem() + validator = Length(min=1) + + +class SortByDefinition(OneOfKeywordSchema): + _one_of = [ + SortByExpression(), + SortByList(), + ] + + class SortBySchema(ExtendedMappingSchema): - sort_by_lower = SortByExpression(name="sortby", missing=drop) - sort_by_upper = SortByExpression(name="sortBy", missing=drop) + sort_by_lower = SortByDefinition(name="sortby", missing=drop) + sort_by_upper = SortByDefinition(name="sortBy", missing=drop) def deserialize(self, cstruct): # type: (JSON) -> Union[JSON, colander.null] @@ -1636,13 +1677,19 @@ def deserialize(self, cstruct): return result if result.get("sortby"): # keep only 'official' "sortBy" from OGC API Processes - # others OGC APIs use "sortby", but their query parameters are usually case-insensitive + # others APIs (Features, Records, STAC) use "sortby", but their query parameter is usually case-insensitive if not result.get("sortBy"): result["sortBy"] = result["sortby"] del result["sortby"] return result +class FieldModifierSchema(FilterSchema, SortBySchema, PropertiesFieldModifierSchema): + """ + Field modifiers that can operation on properties identified within the referenced content definition. + """ + + class SupportedCRS(ExtendedMappingSchema): crs = AnyCRS(title="CRS", description="Coordinate Reference System") default = ExtendedSchemaNode(Boolean(), missing=drop) @@ -3901,7 +3948,7 @@ class ExecuteCollectionFormatEnum(ExtendedSchemaNode): validator = OneOf(ExecuteCollectionFormat.values()) -class ExecuteCollectionInput(FilterSchema, SortBySchema, PermissiveMappingSchema): +class ExecuteCollectionInput(FieldModifierSchema, PermissiveMappingSchema): description = inspect.cleandoc(""" Reference to a 'collection' that can optionally be filtered, sorted, or parametrized. @@ -4301,7 +4348,7 @@ class ExecuteInputOutputs(ExtendedMappingSchema): ) -class ExecuteParameters(ExecuteInputOutputs): +class ExecuteParameters(ExtendedMappingSchema): """ Basic execution parameters that can be submitted to run a process. @@ -4327,17 +4374,21 @@ class ExecuteParameters(ExecuteInputOutputs): subscribers = JobExecuteSubscribers(missing=drop) -class ExecuteProcessParameters(ExecuteParameters): +class ExecuteProcessParameters(ExecuteParameters, ExecuteInputOutputs, FieldModifierSchema): title = "ExecuteProcessParameters" _schema = f"{OGC_API_PROC_PART1_SCHEMAS}/execute.yaml" _sort_first = [ "title", "process", + "mode", + "response", "inputs", "outputs", + "filter", + "filter-crs", + "filter-lang", "properties", - "mode", - "response", + "sortBy", "subscribers", ] _title = JobTitle(name="title", missing=drop) @@ -4382,11 +4433,15 @@ class Execute(AllOfKeywordSchema): "title", "status", "process", + "mode", + "response", "inputs", "outputs", + "filter", + "filter-crs", + "filter-lang", "properties", - "mode", - "response", + "sortBy", "subscribers", ] _all_of = [ From c6c04bed7d4aa8a780804fe587f76bd35ef013df Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Tue, 12 Nov 2024 23:17:14 -0500 Subject: [PATCH 03/11] [wip] conceptual expr calculation of properties field modifier using pyparsing --- requirements.txt | 1 + tests/functional/test_builtin.py | 16 +- tests/functional/test_wps_package.py | 17 +- .../processes/builtin/collection_processor.py | 33 +- .../builtin/field_modifier_processor.cwl | 4 +- .../builtin/field_modifier_processor.py | 306 +++++++++++++++--- weaver/typedefs.py | 4 +- 7 files changed, 309 insertions(+), 72 deletions(-) diff --git a/requirements.txt b/requirements.txt index 1702261d4..4e3a4eb9e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -90,6 +90,7 @@ pygeofilter[fes,backend-native] # - https://github.com/celery/kombu/pull/1536 # - https://github.com/celery/celery/pull/7834 pymongo>=4.6.3 # either (pymongo>=4, kombu>=5.3.0b2) or (pymongo<4, celery<5.2) +pyparsing pyramid>=1.7.3 pyramid_beaker>=0.8 # see https://github.com/sontek/pyramid_celery/pull/102 to fix Python 3.12 support and other improvements diff --git a/tests/functional/test_builtin.py b/tests/functional/test_builtin.py index dd41a5270..23640eed7 100644 --- a/tests/functional/test_builtin.py +++ b/tests/functional/test_builtin.py @@ -20,7 +20,13 @@ ) from weaver.execute import ExecuteControlOption, ExecuteMode, ExecuteResponse, ExecuteTransmissionMode from weaver.formats import ContentEncoding, ContentType, get_format, repr_json -from weaver.processes.builtin import file_index_selector, jsonarray2netcdf, metalink2netcdf, register_builtin_processes +from weaver.processes.builtin import ( + field_modifier_processor, + file_index_selector, + jsonarray2netcdf, + metalink2netcdf, + register_builtin_processes +) from weaver.processes.constants import JobInputsOutputsSchema from weaver.status import Status from weaver.utils import create_metalink, fully_qualified_name, get_path_kvp @@ -1099,3 +1105,11 @@ def test_file_index_selector_invalid_out_dir(): with pytest.raises(ValueError) as err: file_index_selector.main("-f", "", "-i", "1", "-o", tmp_out_dir) assert "does not exist" in str(err.value) + + +def test_field_modifier_processor_expression_variables(): + expr = field_modifier_processor.create_expression_parser() + calc = "properties.eo:cloud_cover + 1" + var = {"properties.eo:cloud_cover": 2} + val = expr.parse_string(calc)[0].eval(var, dict.__getitem__) + assert val == 3.0 diff --git a/tests/functional/test_wps_package.py b/tests/functional/test_wps_package.py index e095af3a3..18ead1d80 100644 --- a/tests/functional/test_wps_package.py +++ b/tests/functional/test_wps_package.py @@ -2418,9 +2418,9 @@ def test_execute_job_with_collection_input_stac(self): """ Validate parsing and handling of ``collection`` specified in an input with :term:`STAC` :term:`API` endpoint. - Also, evaluate the dynamic insertion of a ``properties`` definition. - - .. versionadded:: 6.0 + .. versionadded:: 5.8 + .. versionchanged:: 6.0 + Evaluate the dynamic insertion of a ``properties`` definition into the retrieved collection features. """ name = "EchoFeatures" body = self.retrieve_payload(name, "deploy", local=True) @@ -2435,10 +2435,19 @@ def test_execute_job_with_collection_input_stac(self): ) exec_body_val = self.retrieve_payload(name, "execute", local=True) col_feats = exec_body_val["inputs"]["features"]["value"] # type: JSON + col_feats["features"][0]["properties"]["data"] = 10 + field_props = { + "properties.result": "A * (B + C) / properties.data", # = 21 + # intermediate variables, should not be set + "B": 123, + "C": -50, + "A": 3, + } filter_lang = "cql2-json" filter_value = {"op": "=", "args": [{"property": "name"}, "test"]} - field_props = {"variable": "1 + 1"} # not in search request, separate post-operation search_body = { + # note: 'properties' are not in search request, separate post-operation + "collections": ["test"], "filter": filter_value, "filter-lang": filter_lang, } diff --git a/weaver/processes/builtin/collection_processor.py b/weaver/processes/builtin/collection_processor.py index 73daea319..01cd0f147 100644 --- a/weaver/processes/builtin/collection_processor.py +++ b/weaver/processes/builtin/collection_processor.py @@ -90,7 +90,12 @@ def process_collection(collection_input, input_definition, output_dir, logger=LO input_id = get_any_id(input_definition) logger.log( # pylint: disable=E1205 # false positive logging.INFO, - "Process [{}] Got arguments: collection_input={} output_dir=[{}], for input=[{}]", + ( + "Process [%s] got arguments:\n" + " collection_input=%s\n" + " output_dir=[%s]\n" + " input=[%s]" + ), PACKAGE_NAME, Lazify(lambda: repr_json(collection_input, indent=2)), output_dir, @@ -152,15 +157,7 @@ def process_collection(collection_input, input_definition, output_dir, logger=LO if not (col_resp.status_code == 200 and "features" in col_json): raise ValueError(f"Could not parse [{col_href}] as a GeoJSON FeatureCollection.") - col_json = process_field_modifiers( - col_args.get("filter"), - col_args.get("filter-crs"), - col_args.get("filter-lang"), - col_args.get("properties"), - col_args.get("sortBy"), - col_json, - output_dir, - ) + col_json = process_field_modifiers(col_json, **col_args) for i, feat in enumerate(col_json["features"]): path = os.path.join(output_dir, f"feature-{i}.geojson") @@ -175,24 +172,24 @@ def process_collection(collection_input, input_definition, output_dir, logger=LO for arg in list(col_args): if "-" in arg: col_args[arg.replace("-", "_")] = col_args.pop(arg) + + timeout = col_args.pop("timeout", 10) + col_properties = col_args.pop("properties", None) # will become 'modifier' callable + known_params = set(inspect.signature(ItemSearch).parameters) - known_params -= {"url", "method", "stac_io", "client", "collection", "ids", "properties"} + known_params -= {"url", "method", "stac_io", "client", "collections", "ids"} unknown_params = set(col_args) - known_params for param in unknown_params: col_args.pop(param) # STAC client can be-process filters and sorting server-side # only perform the remaining properties modifier operations locally - col_properties = col_args.pop("properties", None) col_field_modifier = None if col_properties: - col_field_modifier = lambda obj: process_field_modifiers( # noqa: E731 # pylint: disable=C3001 - obj.dict(), - output_dir, - properties=col_properties, - ) + def col_field_modifier(stac_feat_col): + for _feat in stac_feat_col.get("features", []): + process_field_modifiers(_feat, properties=col_properties) - timeout = col_args.pop("timeout", 10) search = ItemSearch( url=f"{api_url}/search", method="POST", diff --git a/weaver/processes/builtin/field_modifier_processor.cwl b/weaver/processes/builtin/field_modifier_processor.cwl index 9e438e751..c842dd100 100644 --- a/weaver/processes/builtin/field_modifier_processor.cwl +++ b/weaver/processes/builtin/field_modifier_processor.cwl @@ -16,7 +16,7 @@ inputs: inputBinding: prefix: --properties filter: - doc: Filter definition submitted to the process and to be generated from input values. + doc: Filter expression submitted to the process and to be generated from input values. type: File format: "iana:application/json" inputBinding: @@ -33,7 +33,7 @@ inputs: inputBinding: prefix: --filter-lang sortBy: - doc: Sorting definition with relevant properties and ordering direction. + doc: Sorting definition with relevant field names and ordering direction. type: string? inputBinding: prefix: --sortby diff --git a/weaver/processes/builtin/field_modifier_processor.py b/weaver/processes/builtin/field_modifier_processor.py index d0d48eb6d..cb0febe2b 100644 --- a/weaver/processes/builtin/field_modifier_processor.py +++ b/weaver/processes/builtin/field_modifier_processor.py @@ -3,7 +3,7 @@ Generates properties contents using the specified input definitions. """ import argparse -import ast +import functools import json import logging import os @@ -11,6 +11,20 @@ import uuid from typing import TYPE_CHECKING +from pyparsing import ( + Word, + nums, + alphas, + alphanums, + Combine, + Literal, + ZeroOrMore, + one_of, + OpAssoc, + infix_notation, + ParserElement, +) + CUR_DIR = os.path.abspath(os.path.dirname(__file__)) sys.path.insert(0, CUR_DIR) # root to allow 'from weaver import <...>' @@ -20,23 +34,27 @@ # pylint: disable=C0413,wrong-import-order from weaver.formats import ContentType, get_cwl_file_format # isort:skip # noqa: E402 from weaver.processes.builtin.utils import get_package_details # isort:skip # noqa: E402) -from weaver.utils import Lazify, load_file, repr_json, request_extra # isort:skip # noqa: E402 +from weaver.utils import Lazify, load_file, repr_json # isort:skip # noqa: E402 if TYPE_CHECKING: - from typing import Dict + from typing import Any, Callable, Dict, TypeAlias from weaver.typedefs import ( CWL_IO_ValueMap, - FieldModifierFilter, FieldModifierFilterCRS, + FieldModifierFilterExpression, FieldModifierFilterLang, FieldModifierProperties, FieldModifierSortBy, JSON, + Number, Path, ) from weaver.utils import LoggerHandler + PropertyGetter: TypeAlias = Callable[[object, str], Any] + PropertySetter: TypeAlias = Callable[[object, str, Any], None] + PACKAGE_NAME, PACKAGE_BASE, PACKAGE_MODULE = get_package_details(__file__) # setup logger since it is not run from the main 'weaver' app @@ -52,59 +70,257 @@ OUTPUT_CWL_JSON = "cwl.output.json" -def compute_property(property_name, calculation, properties): - # type: (str, str, Dict[str, JSON]) -> None +@functools.lru_cache(1024) +def create_expression_parser(): + # type: () -> ParserElement + """ + Creates a parser that can safely evaluate the underlying arithmetic expression with variable substitutions. + + .. seealso:: + A mixture of the following examples serve as reference for the expression implementation: + + - https://github.com/pyparsing/pyparsing/blob/master/examples/simpleArith.py + - https://github.com/pyparsing/pyparsing/blob/master/examples/eval_arith.py + - https://github.com/pyparsing/pyparsing/blob/master/examples/excel_expr.py + """ + + ParserElement.enablePackrat() + + class EvalConstant: + """ + Class to evaluate a parsed constant or variable. + """ + def __init__(self, tokens): + self.value = tokens[0] + + def eval(self, variables, getter): + # type: (Dict[str, Number], PropertyGetter) -> Number + if self.value in variables: + return getter(variables, self.value) + else: + return float(self.value) + + class EvalSignOp: + """ + Class to evaluate expressions with a leading ``+`` or ``-`` sign. + """ + + def __init__(self, tokens): + self.sign, self.value = tokens[0] + + def eval(self, variables, getter): + # type: (Dict[str, Number], PropertyGetter) -> Number + mult = {"+": 1, "-": -1}[self.sign] + return mult * self.value.eval(variables, getter) + + def operator_operands(tokens): + """ + Generator to extract operators and operands in pairs. + """ + it = iter(tokens) + while 1: + try: + yield next(it), next(it) + except StopIteration: + break + + class EvalPowerOp: + """ + Class to evaluate power expressions. + """ + + def __init__(self, tokens): + self.value = tokens[0] + + def eval(self, variables, getter): + # type: (Dict[str, Number], PropertyGetter) -> Number + res = self.value[-1].eval(variables, getter) + for val in self.value[-3::-2]: + res = val.eval(variables, getter) ** res + return res + + class EvalMultOp: + """ + Class to evaluate multiplication and division expressions. + """ + + def __init__(self, tokens): + self.value = tokens[0] + + def eval(self, variables, getter): + # type: (Dict[str, Number], PropertyGetter) -> Number + prod = self.value[0].eval(variables, getter) + for op, val in operator_operands(self.value[1:]): + if op == "*": + prod *= val.eval(variables, getter) + if op == "/": + prod /= val.eval(variables, getter) + return prod + + class EvalAddOp: + """ + Class to evaluate addition and subtraction expressions. + """ + + def __init__(self, tokens): + self.value = tokens[0] + + def eval(self, variables, getter): + # type: (Dict[str, Number], PropertyGetter) -> Number + _sum = self.value[0].eval(variables, getter) + for op, val in operator_operands(self.value[1:]): + if op == "+": + _sum += val.eval(variables, getter) + if op == "-": + _sum -= val.eval(variables, getter) + return _sum + + class EvalComparisonOp: + """ + Class to evaluate comparison expressions. + """ + opMap = { + "<": lambda a, b: a < b, + "<=": lambda a, b: a <= b, + ">": lambda a, b: a > b, + ">=": lambda a, b: a >= b, + "!=": lambda a, b: a != b, + "=": lambda a, b: a == b, + "LT": lambda a, b: a < b, + "LE": lambda a, b: a <= b, + "GT": lambda a, b: a > b, + "GE": lambda a, b: a >= b, + "NE": lambda a, b: a != b, + "EQ": lambda a, b: a == b, + "<>": lambda a, b: a != b, + } + + def __init__(self, tokens): + self.value = tokens[0] + + def eval(self, variables, getter): + # type: (Dict[str, Number], PropertyGetter) -> Number + val1 = self.value[0].eval(variables, getter) + for op, val in operator_operands(self.value[1:]): + fn = EvalComparisonOp.opMap[op] + val2 = val.eval(variables, getter) + if not fn(val1, val2): + break + val1 = val2 + else: + return True + return False + + # define the parser + integer = Word(nums) + real = Combine(Word(nums) + "." + Word(nums)) + ident = Word(alphas, alphanums + "_", min=1) + variable = Combine(ident + ZeroOrMore((Literal(":") | Literal(".")) + ident)) + operand = real | integer | variable + + sign_op = one_of("+ -") + mult_op = one_of("* /") + plus_op = one_of("+ -") + exp_op = one_of("** ^") + + # use parse actions to attach EvalXXX constructors to sub-expressions + operand.setParseAction(EvalConstant) + arith_expr = infix_notation( + operand, + [ + (sign_op, 1, OpAssoc.RIGHT, EvalSignOp), + (exp_op, 2, OpAssoc.LEFT, EvalPowerOp), + (mult_op, 2, OpAssoc.LEFT, EvalMultOp), + (plus_op, 2, OpAssoc.LEFT, EvalAddOp), + ], + ) + + comparison_op = one_of("< <= > >= != = <> LT GT LE GE EQ NE") + comp_expr = infix_notation( + arith_expr, + [ + (comparison_op, 2, OpAssoc.LEFT, EvalComparisonOp), + ], + ) + return comp_expr + + +def evaluate_property( + properties, # type: Dict[str, JSON] + property_name, # type: str + property_expression, # type: str + property_getter=None, # type: PropertyGetter + property_setter=None, # type: PropertySetter +): # type: (...) -> None + """ + Evaluates the applicable property expression with variable retrieval and insertion in the destination object. + + :param properties: Mapping of available property variables and expressions. + :param property_name: Target property to evaluate. + :param property_expression: Calculation to be evaluated for the property, possibly referring to other properties. + :param property_getter: Function that knows how to retrieve a variable property from the properties' destination. + :param property_setter: Function that knows how to insert the evaluated property into the properties' destination. + """ + property_getter = property_getter or dict.__getitem__ + property_setter = property_setter or dict.__setitem__ ... # FIXME: ast to do eval safely - TBD: what about property pointing at file? - calc = calculation.lower() # handle 'Min()'->'min()' - names allowed by "well-known functions" - result = ast.literal_eval(calc) - properties.update({property_name: result}) + expr = create_expression_parser() + result = expr.parse_string(property_expression)[0].eval(properties, getter=property_getter) + property_setter(properties, property_name, result) def process_field_modifiers( - values, # type: Dict[str, JSON] - output_dir, # type: Path - *, # force named keyword arguments after - filter=None, # type: FieldModifierFilter, - filter_crs=None, # type: FieldModifierFilterCRS, - filter_lang=None, # type: FieldModifierFilterLang, - properties=None, # type: FieldModifierProperties, - sortby=None, # type: FieldModifierSortBy, - logger=LOGGER, # type: LoggerHandler -): # type: (...) -> JSON + values, # type: Dict[str, JSON] + *, # force named keyword arguments after + filter_expr=None, # type: FieldModifierFilterExpression, + filter_crs=None, # type: FieldModifierFilterCRS, + filter_lang=None, # type: FieldModifierFilterLang, + properties=None, # type: FieldModifierProperties, + property_getter=None, # type: PropertyGetter + property_setter=None, # type: PropertySetter + sortby=None, # type: FieldModifierSortBy, + logger=LOGGER, # type: LoggerHandler +): # type: (...) -> JSON """ - Processor of a ``properties`` definition for an input or output. + Processor of field modifiers for an input or output. - :param values: - Values available for properties modification. - :param output_dir: Directory to write the output (provided by the :term:`CWL` definition). - :param properties: - Properties definition submitted to the process and to be generated from input values. + .. note:: + Modifications are applied inline to the specified :paramref:`values` to allow integration + with various interfaces that have such expectations. For convenience, modified contents are + also returned for interfaces that expect the result as output. + + :param values: Values available for properties modification. + :param filter_expr: Filter expression submitted to the process and to be generated from input values. + :param filter_lang: Filter language to interpret the filter expression. + :param filter_crs: Filter Coordinate Reference System (CRS) to employ with the filter expression. + :param properties: Properties definition submitted to the process and to be generated from input values. + :param property_getter: Function that knows how to retrieve a variable property from the properties' destination. + :param property_setter: Function that knows how to insert the evaluated property into the properties' destination. + :param sortby: Sorting definition with relevant field names and ordering direction. :param logger: Optional logger handler to employ. :return: File reference containing the resolved properties. """ logger.log( # pylint: disable=E1205 # false positive logging.INFO, ( - "Process [{}] Got arguments:\n" - " filter={}\n" - " filter_crs={}\n" - " filter_lang={}\n" - " properties={}\n" - " sortby={}\n" - " values={}\n" - " output_dir=[{}]" + "Process [%s] got arguments:\n" + " filter_expr=%s\n" + " filter_crs=%s\n" + " filter_lang=%s\n" + " properties=%s\n" + " sortby=%s\n" + " values=%s" ), PACKAGE_NAME, - Lazify(lambda: repr_json(filter, indent=2)), + Lazify(lambda: repr_json(filter_expr, indent=2)), Lazify(lambda: repr_json(filter_crs, indent=2)), Lazify(lambda: repr_json(filter_lang, indent=2)), Lazify(lambda: repr_json(properties, indent=2)), Lazify(lambda: repr_json(sortby, indent=2)), Lazify(lambda: repr_json(values, indent=2)), - output_dir, ) - os.makedirs(output_dir, exist_ok=True) + properties = properties or {} # sort properties later if they depend on other ones, the least dependencies to be computed first props_deps = {prop: 0 for prop in properties} @@ -124,15 +340,16 @@ def process_field_modifiers( ) # compute the properties - properties = {} for prop, calc in props: - compute_property(prop, calc, properties) + evaluate_property(properties, prop, calc, property_getter=property_getter, property_setter=property_setter) + + # FIXME: handle filtering and sorting return properties def process_cwl( - input_filter, # type: FieldModifierFilter, + input_filter_expr, # type: FieldModifierFilterExpression, input_filter_crs, # type: FieldModifierFilterCRS, input_filter_lang, # type: FieldModifierFilterLang, input_properties, # type: FieldModifierProperties, @@ -142,8 +359,7 @@ def process_cwl( ): # type: (...) -> CWL_IO_ValueMap result = process_field_modifiers( values=input_values, - output_dir=output_dir, - filter=input_filter, + filter_expr=input_filter_expr, filter_crs=input_filter_crs, filter_lang=input_filter_lang, properties=input_properties, @@ -170,8 +386,8 @@ def main(*args): parser = argparse.ArgumentParser(description=__doc__) parser.add_argument( "-F", "--filter", - dest="input_filter", - metavar="INPUT_FILTER", + dest="input_filter_expr", + metavar="INPUT_FILTER_EXPRESSION", required=False, help="Filter definition submitted to the process and to be generated from input values.", ) @@ -185,7 +401,7 @@ def main(*args): parser.add_argument( "--filter-lang", dest="input_filter_lang", - metavar="INPUT_FILTER_LANG", + metavar="INPUT_FILTER_LANGUAGE", required=False, help="Filter language to interpret the 'filter' parameter.", ) @@ -201,7 +417,7 @@ def main(*args): dest="input_sortby", metavar="INPUT_SORTBY", required=False, - help="Sorting definition with relevant properties and ordering direction.", + help="Sorting definition with relevant field names and ordering direction.", ) parser.add_argument( "-V", "--values", diff --git a/weaver/typedefs.py b/weaver/typedefs.py index 4b49a99f4..0323defb2 100644 --- a/weaver/typedefs.py +++ b/weaver/typedefs.py @@ -430,7 +430,7 @@ class CWL_SchemaName(Protocol): DataSource = Union[DataSourceFileRef, DataSourceOpenSearch] DataSourceConfig = Dict[str, DataSource] # JSON/YAML file contents - FieldModifierFilter = Union[ + FieldModifierFilterExpression = Union[ str, JSON, # CQL or other ] @@ -447,7 +447,7 @@ class CWL_SchemaName(Protocol): List[FieldModifierSortByItem], ] FiledModifiers = TypedDict("FiledModifiers", { - "filter": Optional[FieldModifierFilter], + "filter": Optional[FieldModifierFilterExpression], "filter-crs": Optional[FieldModifierFilterCRS], "filter-lang": Optional[FieldModifierFilterLang], "properties": Optional[FieldModifierProperties], From 56e04ed4eb57b6bf9c49004b449d704f6a1a6c13 Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Tue, 12 Nov 2024 23:21:16 -0500 Subject: [PATCH 04/11] fix imports linting --- .../builtin/field_modifier_processor.py | 16 ++++++++-------- weaver/processes/ogc_api_process.py | 2 +- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/weaver/processes/builtin/field_modifier_processor.py b/weaver/processes/builtin/field_modifier_processor.py index cb0febe2b..4a7214f76 100644 --- a/weaver/processes/builtin/field_modifier_processor.py +++ b/weaver/processes/builtin/field_modifier_processor.py @@ -12,17 +12,17 @@ from typing import TYPE_CHECKING from pyparsing import ( - Word, - nums, - alphas, - alphanums, Combine, Literal, - ZeroOrMore, - one_of, OpAssoc, - infix_notation, ParserElement, + Word, + ZeroOrMore, + alphanums, + alphas, + infix_notation, + nums, + one_of ) CUR_DIR = os.path.abspath(os.path.dirname(__file__)) @@ -48,7 +48,7 @@ FieldModifierSortBy, JSON, Number, - Path, + Path ) from weaver.utils import LoggerHandler diff --git a/weaver/processes/ogc_api_process.py b/weaver/processes/ogc_api_process.py index 7d4b215d1..fb69d6003 100644 --- a/weaver/processes/ogc_api_process.py +++ b/weaver/processes/ogc_api_process.py @@ -9,12 +9,12 @@ from typing import Optional from weaver.typedefs import ( - JSON, CWL_ExpectedOutputs, ExecutionInputsMap, ExecutionOutputsMap, JobInputs, JobOutputs, + JSON, UpdateStatusPartialFunction ) from weaver.wps.service import WorkerRequest From f2b4db84f9bfed1d676bba802e35e909c0170f92 Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Tue, 12 Nov 2024 23:47:33 -0500 Subject: [PATCH 05/11] fix field modifier checks --- tests/wps_restapi/test_swagger_definitions.py | 14 ++++++++- weaver/wps_restapi/swagger_definitions.py | 29 +++++++++++++++---- 2 files changed, 36 insertions(+), 7 deletions(-) diff --git a/tests/wps_restapi/test_swagger_definitions.py b/tests/wps_restapi/test_swagger_definitions.py index a986c4c9c..e968d3101 100644 --- a/tests/wps_restapi/test_swagger_definitions.py +++ b/tests/wps_restapi/test_swagger_definitions.py @@ -387,7 +387,19 @@ def test_collection_input_filter_lang_case_insensitive(): }, { "collection": "https://example.com/collections/test", - "sortBy": ["name"], + "sortBy": [123], + }, + { + "collection": "https://example.com/collections/test", + "properties": "ABC", + }, + { + "collection": "https://example.com/collections/test", + "properties": [], + }, + { + "collection": "https://example.com/collections/test", + "properties": {}, }, ] ) diff --git a/weaver/wps_restapi/swagger_definitions.py b/weaver/wps_restapi/swagger_definitions.py index 421cdce41..6f831e51f 100644 --- a/weaver/wps_restapi/swagger_definitions.py +++ b/weaver/wps_restapi/swagger_definitions.py @@ -1579,7 +1579,7 @@ def convert(self, filter_expr, filter_lang): def deserialize(self, cstruct): # type: (JSON) -> Union[JSON, colander.null] result = super().deserialize(cstruct) - if not result: + if not cstruct: return result filter_expr = result.get("filter") filter_lang = result.get("filter-lang") @@ -1608,7 +1608,7 @@ def deserialize(self, cstruct): return result -class PropertiesFieldModifierExpression(ExtendedMappingSchema): +class PropertiesExpression(ExtendedMappingSchema): prop = AnyFilterExpression( variable="{property}", description="Expression that defines how to compute the property.", @@ -1616,8 +1616,18 @@ class PropertiesFieldModifierExpression(ExtendedMappingSchema): ) -class PropertiesFieldModifierSchema(ExtendedMappingSchema): - properties = PropertiesFieldModifierExpression(missing=drop) +class PropertiesSchema(ExtendedMappingSchema): + properties = PropertiesExpression(missing=drop, validator=Length(min=1)) + + def deserialize(self, cstruct): + result = super().deserialize(cstruct) + if "properties" in cstruct and "properties" not in result: + raise colander.Invalid( + node=self, + msg="Invalid properties expression could not be interpreted.", + value={"properties": repr_json(cstruct["properties"])}, + ) + return result class SortByExpression(ExpandStringList, ExtendedSchemaNode): @@ -1673,7 +1683,7 @@ def deserialize(self, cstruct): Therefore, additional fields must be left untouched. """ result = super().deserialize(cstruct) - if not result: + if not cstruct: return result if result.get("sortby"): # keep only 'official' "sortBy" from OGC API Processes @@ -1681,10 +1691,17 @@ def deserialize(self, cstruct): if not result.get("sortBy"): result["sortBy"] = result["sortby"] del result["sortby"] + sort_by = cstruct.get("sortby") or cstruct.get("sortBy") + if "sortBy" not in result and sort_by: + raise colander.Invalid( + node=self, + msg="Invalid sortBy expression could not be interpreted.", + value={"sortBy": repr_json(sort_by)}, + ) return result -class FieldModifierSchema(FilterSchema, SortBySchema, PropertiesFieldModifierSchema): +class FieldModifierSchema(FilterSchema, SortBySchema, PropertiesSchema): """ Field modifiers that can operation on properties identified within the referenced content definition. """ From 30ebce5a4fbb733c80ccb89110f83affbd852f83 Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Wed, 13 Nov 2024 00:01:13 -0500 Subject: [PATCH 06/11] fix typing --- weaver/typedefs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/weaver/typedefs.py b/weaver/typedefs.py index 0323defb2..6c97a0949 100644 --- a/weaver/typedefs.py +++ b/weaver/typedefs.py @@ -476,7 +476,7 @@ class CWL_SchemaName(Protocol): }, total=False) JobValueCollection = TypedDict("JobValueCollection", { "collection": Required[str], - "filter": Optional[FieldModifierFilter], + "filter": Optional[FieldModifierFilterExpression], "filter-crs": Optional[FieldModifierFilterCRS], "filter-lang": Optional[FieldModifierFilterLang], "properties": Optional[FieldModifierProperties], From 76664bc620ba3d8bad93be6e1b33eddfa2ad93a6 Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Wed, 13 Nov 2024 00:09:00 -0500 Subject: [PATCH 07/11] fix linting and params errors --- tests/functional/test_wps_package.py | 2 +- .../builtin/field_modifier_processor.py | 5 ++--- weaver/processes/execution.py | 16 ++++++++++++---- 3 files changed, 15 insertions(+), 8 deletions(-) diff --git a/tests/functional/test_wps_package.py b/tests/functional/test_wps_package.py index 18ead1d80..d4727aa5c 100644 --- a/tests/functional/test_wps_package.py +++ b/tests/functional/test_wps_package.py @@ -2494,7 +2494,7 @@ def test_execute_job_with_collection_input_stac(self): out_prop.update(field_props) for feat_src, feat_out in zip(col_feats["features"], out_data["features"]): assert feat_src["properties"] == {"name": "test"} - assert out_data["properties"] == out_prop + assert feat_out["properties"] == out_prop @pytest.mark.oap_part3 def test_execute_job_with_nested_process_properties_field_modifier(self): diff --git a/weaver/processes/builtin/field_modifier_processor.py b/weaver/processes/builtin/field_modifier_processor.py index 4a7214f76..acd6a2e96 100644 --- a/weaver/processes/builtin/field_modifier_processor.py +++ b/weaver/processes/builtin/field_modifier_processor.py @@ -71,7 +71,7 @@ @functools.lru_cache(1024) -def create_expression_parser(): +def create_expression_parser(): # pylint: disable=R1260,too-complex # type: () -> ParserElement """ Creates a parser that can safely evaluate the underlying arithmetic expression with variable substitutions. @@ -264,7 +264,6 @@ def evaluate_property( property_getter = property_getter or dict.__getitem__ property_setter = property_setter or dict.__setitem__ - ... # FIXME: ast to do eval safely - TBD: what about property pointing at file? expr = create_expression_parser() result = expr.parse_string(property_expression)[0].eval(properties, getter=property_getter) property_setter(properties, property_name, result) @@ -438,7 +437,7 @@ def main(*args): prop_in = load_file(ns.input_properties) LOGGER.info("Process [%s] Loading values input from file '%s'.", PACKAGE_NAME, ns.values) val_in = load_file(ns.input_values) - params = dict(**vars(ns)) + params = {**vars(ns)} params.update({"input_properties": prop_in, "input_values": val_in}) sys.exit(process_cwl(**params) is not None) diff --git a/weaver/processes/execution.py b/weaver/processes/execution.py index 6b23d8812..0fd2b5907 100644 --- a/weaver/processes/execution.py +++ b/weaver/processes/execution.py @@ -665,12 +665,20 @@ def parse_wps_inputs(wps_process, job, container=None): f"to input [{input_id}] of [{job.process}]." ) results = results[0] - field_modifier_props = input_value.get("properties") - if field_modifier_props: + + field_modifier_properties = input_value.get("properties") + field_modifier_filter_expr = input_value.get("filter") + field_modifier_filter_crs = input_value.get("filter-crs") + field_modifier_filter_lang = input_value.get("filter-lang") + field_modifier_sortby = input_value.get("sortBy") + if field_modifier_properties or field_modifier_filter_expr or field_modifier_sortby: results = process_field_modifiers( results, - out_dir, - properties=field_modifier_props, + properties=field_modifier_properties, + filter_expr=field_modifier_filter_expr, + filter_crs=field_modifier_filter_crs, + filter_lang=field_modifier_filter_lang, + sortby=field_modifier_sortby, ) resolved_input_values = [(results, input_info)] From 8dddb04f8242ca6dad7d7198c0ff7d0dfa5eb23a Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Thu, 14 Nov 2024 10:01:53 -0500 Subject: [PATCH 08/11] fix schema converter typings --- weaver/wps_restapi/colander_extras.py | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/weaver/wps_restapi/colander_extras.py b/weaver/wps_restapi/colander_extras.py index 0c8d75dda..127ab7c35 100644 --- a/weaver/wps_restapi/colander_extras.py +++ b/weaver/wps_restapi/colander_extras.py @@ -430,7 +430,7 @@ def __init__(self, schemes=None, path_pattern=None, msg=None, flags=re.IGNORECAS if path_pattern: if isinstance(path_pattern, RegexPattern): path_pattern = path_pattern.pattern - # depending colander version: $ end-of-line, \Z end-of-string (before \n if any), or \z end-of-string (\0) + # depends on colander version: $ end-of-line, \Z end-of-string (before \n if any), or \z end-of-string (\0) index = -2 if regex.lower().endswith(r"\z") else -1 if regex.endswith("$") else 0 regex = rf"{regex[:index] + path_pattern}\Z" super(SchemeURL, self).__init__(regex, msg=msg, flags=flags) @@ -1398,7 +1398,7 @@ def _order_deserialize(cstruct, sort_first=None, sort_after=None): return result -class SchemaRefMappingSchema(ExtendedNodeInterface, ExtendedSchemaBase): +class SchemaRefMappingSchema(ExtendedNodeInterface, ExtendedSchemaBase, TypeConverter): """ Mapping schema that supports auto-insertion of JSON-schema references provided in the definition. @@ -1530,23 +1530,24 @@ def _deserialize_impl(self, cstruct): # pylint: disable=W0222,signature-differs return self._schema_deserialize(cstruct, schema_id, None) return cstruct - def convert_type(self, cstruct, dispatcher=None): # noqa # parameter to allow forwarding ref for override schemas - # type: (OpenAPISchema, Optional[TypeConversionDispatcher]) -> OpenAPISchema + @staticmethod + def convert_type(node, cstruct, dispatcher=None): # noqa # parameter to allow forwarding ref for override schemas + # type: (colander.SchemaNode, OpenAPISchema, Optional[TypeConversionDispatcher]) -> OpenAPISchema """ Converts the node to obtain the :term:`JSON` schema definition. """ schema_id = schema_meta = None - schema_id_include = getattr(self, "_schema_include", False) - schema_id_include_convert_type = getattr(self, "_schema_include_convert_type", False) - schema_meta_include = getattr(self, "_schema_meta_include", False) - schema_meta_include_convert_type = getattr(self, "_schema_meta_include_convert_type", False) - schema_extra = getattr(self, "_schema_extra", None) + schema_id_include = getattr(node, "_schema_include", False) + schema_id_include_convert_type = getattr(node, "_schema_include_convert_type", False) + schema_meta_include = getattr(node, "_schema_meta_include", False) + schema_meta_include_convert_type = getattr(node, "_schema_meta_include_convert_type", False) + schema_extra = getattr(node, "_schema_extra", None) if schema_id_include and schema_id_include_convert_type: - schema_id = getattr(self, "_schema", None) + schema_id = getattr(node, "_schema", None) if schema_meta_include and schema_meta_include_convert_type: - schema_meta = getattr(self, "_schema_meta", None) + schema_meta = getattr(node, "_schema_meta", None) if schema_id or schema_meta or schema_extra: - return self._schema_deserialize(cstruct, schema_meta, schema_id, schema_extra) + return node._schema_deserialize(cstruct, schema_meta, schema_id, schema_extra) return cstruct @staticmethod From 81e7561bead4e1e0f0ac1c4c62917ea0ed4ddb01 Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Thu, 14 Nov 2024 10:02:10 -0500 Subject: [PATCH 09/11] fix function def linting --- weaver/processes/builtin/collection_processor.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/weaver/processes/builtin/collection_processor.py b/weaver/processes/builtin/collection_processor.py index 01cd0f147..80b49da4d 100644 --- a/weaver/processes/builtin/collection_processor.py +++ b/weaver/processes/builtin/collection_processor.py @@ -184,11 +184,12 @@ def process_collection(collection_input, input_definition, output_dir, logger=LO # STAC client can be-process filters and sorting server-side # only perform the remaining properties modifier operations locally - col_field_modifier = None if col_properties: def col_field_modifier(stac_feat_col): for _feat in stac_feat_col.get("features", []): process_field_modifiers(_feat, properties=col_properties) + else: + col_field_modifier = None search = ItemSearch( url=f"{api_url}/search", From 3c6237702efbc9e10ccdfd4efc35fe084f256be7 Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Thu, 14 Nov 2024 23:45:07 -0500 Subject: [PATCH 10/11] working STAC collection with properties expression --- tests/functional/test_wps_package.py | 58 +++- .../processes/builtin/collection_processor.py | 328 +++++++++++------- .../builtin/field_modifier_processor.py | 212 ++++++++--- weaver/typedefs.py | 4 +- weaver/wps_restapi/swagger_definitions.py | 45 ++- 5 files changed, 450 insertions(+), 197 deletions(-) diff --git a/tests/functional/test_wps_package.py b/tests/functional/test_wps_package.py index d4727aa5c..ed26e28e0 100644 --- a/tests/functional/test_wps_package.py +++ b/tests/functional/test_wps_package.py @@ -2414,7 +2414,7 @@ def test_execute_job_with_collection_input_ogc_features(self, filter_method, fil assert out_data["features"] == col_feats["features"] @pytest.mark.oap_part3 - def test_execute_job_with_collection_input_stac(self): + def test_execute_job_with_collection_input_stac_as_features_with_properties(self): """ Validate parsing and handling of ``collection`` specified in an input with :term:`STAC` :term:`API` endpoint. @@ -2435,19 +2435,43 @@ def test_execute_job_with_collection_input_stac(self): ) exec_body_val = self.retrieve_payload(name, "execute", local=True) col_feats = exec_body_val["inputs"]["features"]["value"] # type: JSON - col_feats["features"][0]["properties"]["data"] = 10 + + # patch the original content to make it respect STAC validation + col_id = "test" + for idx, feat in enumerate(col_feats["features"]): + feat.update({ + "stac_version": "1.0.0", + "stac_extensions": [], + "collection": col_id, + "id": f"{col_id}-{idx}", + "assets": {}, + }) + feat["properties"].update({ + "datetime": "2024-01-01T00:00:00Z", + }) + + # setup input collection properties + var_A = 3 + var_B = 123 + var_C = -63 + var_data = 10 + expect_result = (var_A * (var_B + var_C) / var_data) ** 2 # = 324 + col_feats["features"][0]["properties"]["data"] = var_data + col_feats_src = copy.deepcopy(col_feats["features"][0]) # for later validation field_props = { - "properties.result": "A * (B + C) / properties.data", # = 21 + "properties.result": "(A * (B + C) / properties.data) ^ 2", # intermediate variables, should not be set - "B": 123, - "C": -50, - "A": 3, + # use random order to validate resolution priority + "B": var_B, + "C": var_C, + "A": var_A, } + forbidden_vars = set(field_props) - {"properties.result"} filter_lang = "cql2-json" filter_value = {"op": "=", "args": [{"property": "name"}, "test"]} search_body = { # note: 'properties' are not in search request, separate post-operation - "collections": ["test"], + "collections": [col_id], "filter": filter_value, "filter-lang": filter_lang, } @@ -2459,7 +2483,7 @@ def test_execute_job_with_collection_input_stac(self): "response": ExecuteResponse.DOCUMENT, "inputs": { "features": { - "collection": f"{tmp_host}/collections/test", + "collection": f"{tmp_host}/collections/{col_id}", "format": ExecuteCollectionFormat.STAC, "type": ContentType.APP_GEOJSON, "filter-lang": filter_lang, @@ -2489,12 +2513,22 @@ def test_execute_job_with_collection_input_stac(self): out_data = json.load(out_fd) assert "features" in out_data and isinstance(out_data["features"], list) + assert len(out_data["features"]) == 1 assert all("properties" in feat for feat in out_data["features"]) - out_prop = {"name": "test"} - out_prop.update(field_props) + for feat_src, feat_out in zip(col_feats["features"], out_data["features"]): - assert feat_src["properties"] == {"name": "test"} - assert feat_out["properties"] == out_prop + assert feat_src["properties"] == col_feats_src["properties"], "source data should not be modified" + assert feat_out["properties"] != col_feats_src["properties"], "result data should have been modified" + + # validate multiple locations to ensure insertion happened + # where expected and omitted where disallowed (i.e.: variables) + assert "properties.result" not in feat_out + assert "properties.result" not in feat_out["properties"] + assert "result" in feat_out["properties"] + assert feat_out["properties"]["result"] == expect_result + for var in forbidden_vars: + assert var not in feat_out + assert var not in feat_out["properties"] @pytest.mark.oap_part3 def test_execute_job_with_nested_process_properties_field_modifier(self): diff --git a/weaver/processes/builtin/collection_processor.py b/weaver/processes/builtin/collection_processor.py index 80b49da4d..6d2902a84 100644 --- a/weaver/processes/builtin/collection_processor.py +++ b/weaver/processes/builtin/collection_processor.py @@ -31,17 +31,27 @@ get_cwl_file_format, get_extension ) -from weaver.processes.builtin.field_modifier_processor import process_field_modifiers # isort:skip # noqa: E402 +from weaver.processes.builtin.field_modifier_processor import ( # isort:skip # noqa: E402 + NestedDictHandler, + process_field_modifiers +) from weaver.processes.builtin.utils import ( # isort:skip # noqa: E402 get_package_details, is_geojson_url, validate_reference ) -from weaver.utils import Lazify, get_any_id, load_file, repr_json, request_extra # isort:skip # noqa: E402 +from weaver.utils import ( # isort:skip # noqa: E402 + Lazify, + get_any_id, + get_secure_filename, + load_file, + repr_json, + request_extra +) from weaver.wps_restapi import swagger_definitions as sd # isort:skip # noqa: E402 if TYPE_CHECKING: - from typing import List + from typing import Dict, List from pystac import Asset @@ -70,7 +80,7 @@ OUTPUT_CWL_JSON = "cwl.output.json" -def process_collection(collection_input, input_definition, output_dir, logger=LOGGER): # pylint: disable=R1260 +def process_collection(collection_input, input_definition, output_dir, logger=LOGGER): # type: (JobValueCollection, ProcessInputOutputItem, Path, LoggerHandler) -> List[CWL_IO_FileValue] """ Processor of a :term:`Collection`. @@ -143,131 +153,24 @@ def process_collection(collection_input, input_definition, output_dir, logger=LO ) resolved_files = [] if col_fmt == ExecuteCollectionFormat.GEOJSON: - # static GeoJSON FeatureCollection document - col_resp = request_extra( - "GET", - col_href, - queries=col_args, - headers={"Accept": f"{ContentType.APP_GEOJSON},{ContentType.APP_JSON}"}, - timeout=col_args["timeout"], - retries=3, - only_server_errors=False, - ) - col_json = col_resp.json() - if not (col_resp.status_code == 200 and "features" in col_json): - raise ValueError(f"Could not parse [{col_href}] as a GeoJSON FeatureCollection.") - - col_json = process_field_modifiers(col_json, **col_args) - - for i, feat in enumerate(col_json["features"]): - path = os.path.join(output_dir, f"feature-{i}.geojson") - with open(path, mode="w", encoding="utf-8") as file: - json.dump(feat, file) - _, file_fmt = get_cwl_file_format(ContentType.APP_GEOJSON) - file_obj = {"class": "File", "path": f"file://{path}", "format": file_fmt} - resolved_files.append(file_obj) + geojson_files = resolve_static_geojson_features_files(col_href, col_args, output_dir) + resolved_files.extend(geojson_files) elif col_fmt == ExecuteCollectionFormat.STAC: - # convert all parameters to their corresponding name of the query utility, and ignore unknown ones - for arg in list(col_args): - if "-" in arg: - col_args[arg.replace("-", "_")] = col_args.pop(arg) - - timeout = col_args.pop("timeout", 10) - col_properties = col_args.pop("properties", None) # will become 'modifier' callable - - known_params = set(inspect.signature(ItemSearch).parameters) - known_params -= {"url", "method", "stac_io", "client", "collections", "ids"} - unknown_params = set(col_args) - known_params - for param in unknown_params: - col_args.pop(param) - - # STAC client can be-process filters and sorting server-side - # only perform the remaining properties modifier operations locally - if col_properties: - def col_field_modifier(stac_feat_col): - for _feat in stac_feat_col.get("features", []): - process_field_modifiers(_feat, properties=col_properties) - else: - col_field_modifier = None - - search = ItemSearch( - url=f"{api_url}/search", - method="POST", - stac_io=StacApiIO(timeout=timeout, max_retries=3), # FIXME: add 'headers' with authorization/cookies? - collections=col_id, - modifier=col_field_modifier, - **col_args - ) - for item in search.items(): - for ctype in col_media_type: - for _, asset in item.get_assets(media_type=ctype): # type: (..., Asset) - _, file_fmt = get_cwl_file_format(ctype) - file_obj = {"class": "File", "path": asset.href, "format": file_fmt} - resolved_files.append(file_obj) + stac_files = resolve_stac_collection_files(api_url, col_id, col_args, col_media_type, output_dir) + resolved_files.extend(stac_files) elif col_fmt == ExecuteCollectionFormat.OGC_FEATURES: - if str(col_args.get("filter-lang")) == "cql2-json": - col_args["cql"] = col_args.pop("filter") - search = Features( - url=api_url, - # FIXME: add 'auth' or 'headers' authorization/cookies? - headers={"Accept": f"{ContentType.APP_GEOJSON}, {ContentType.APP_VDN_GEOJSON}, {ContentType.APP_JSON}"}, - ) - items = search.collection_items(col_id, **col_args) - if items.get("type") != "FeatureCollection" or "features" not in items: - raise ValueError( - f"Collection [{col_href}] using format [{col_fmt}] did not return a GeoJSON FeatureCollection." - ) - for i, feat in enumerate(items["features"]): - # NOTE: - # since STAC is technically OGC API - Features compliant, both can be used interchangeably - # if media-types are non-GeoJSON and happen to contain STAC Assets, handle it as STAC transparently - if "assets" in feat and col_media_type != [ContentType.APP_GEOJSON]: - for _, asset in feat["assets"].items(): # type: (str, JSON) - if asset["type"] in col_media_type: - _, file_fmt = get_cwl_file_format(asset["type"]) - file_obj = {"class": "File", "path": asset["href"], "format": file_fmt} - resolved_files.append(file_obj) - else: - path = os.path.join(output_dir, f"feature-{i}.geojson") - with open(path, mode="w", encoding="utf-8") as file: - json.dump(feat, file) - _, file_fmt = get_cwl_file_format(ContentType.APP_GEOJSON) - file_obj = {"class": "File", "path": f"file://{path}", "format": file_fmt} - resolved_files.append(file_obj) + feat_files = resolve_ogc_api_features_files(api_url, col_id, col_args, col_media_type, output_dir) + resolved_files.extend(feat_files) elif col_fmt == ExecuteCollectionFormat.OGC_COVERAGE: - cov = Coverages( - url=api_url, - # FIXME: add 'auth' or 'headers' authorization/cookies? - headers={"Accept": ContentType.APP_JSON}, - ) - ctype = (col_media_type or [ContentType.IMAGE_COG])[0] - ext = get_extension(ctype, dot=False) - path = os.path.join(output_dir, f"map.{ext}") - with open(path, mode="wb") as file: - data = cast(io.BytesIO, cov.coverage(col_id)).getbuffer() - file.write(data) # type: ignore - _, file_fmt = get_cwl_file_format(ctype) - file_obj = {"class": "File", "path": f"file://{path}", "format": file_fmt} - resolved_files.append(file_obj) + cov_files = resolve_ogc_api_coverages_files(api_url, col_id, col_args, col_media_type, output_dir) + resolved_files.extend(cov_files) elif col_fmt in ExecuteCollectionFormat.OGC_MAP: - maps = Maps( - url=api_url, - # FIXME: add 'auth' or 'headers' authorization/cookies? - headers={"Accept": ContentType.APP_JSON}, - ) - ctype = (col_media_type or [ContentType.IMAGE_COG])[0] - ext = get_extension(ctype[0], dot=False) - path = os.path.join(output_dir, f"map.{ext}") - with open(path, mode="wb") as file: - data = cast(io.BytesIO, maps.map(col_id)).getbuffer() - file.write(data) # type: ignore - _, file_fmt = get_cwl_file_format(ctype) - file_obj = {"class": "File", "path": f"file://{path}", "format": file_fmt} - resolved_files.append(file_obj) + maps_files = resolve_ogc_api_maps_files(api_url, col_id, col_args, col_media_type, output_dir) + resolved_files.extend(maps_files) else: raise ValueError(f"Collection [{col_href}] could not be resolved. Unknown format [{col_fmt}].") @@ -285,6 +188,189 @@ def col_field_modifier(stac_feat_col): return resolved_files +def resolve_stac_collection_files( + api_url, # type: str + collection_id, # type: str + collection_args, # type: Dict[str, JSON] + supported_media_types, # type: List[str] + output_dir, # type: str +): # type: (...) -> List[CWL_IO_FileValue] + + # convert all parameters to their corresponding name of the query utility, and ignore unknown ones + for arg in list(collection_args): + if "-" in arg: + collection_args[arg.replace("-", "_")] = collection_args.pop(arg) + + timeout = collection_args.pop("timeout", 10) + col_properties = collection_args.pop("properties", None) # will become 'modifier' callable + + known_params = set(inspect.signature(ItemSearch).parameters) + known_params -= {"url", "method", "stac_io", "client", "collections", "ids"} + unknown_params = set(collection_args) - known_params + for param in unknown_params: + collection_args.pop(param) + + # STAC can perform filters and sorting server-side + # only perform the remaining properties modifier operations locally as needed + if col_properties: + def col_field_modifier(stac_feat_col): + for _feat in stac_feat_col.get("features", []): + handler = NestedDictHandler(_feat) + process_field_modifiers(_feat, properties=col_properties, property_handler=handler) + else: + col_field_modifier = None + + resolved_files = [] + search = ItemSearch( + url=f"{api_url}/search", + method="POST", + stac_io=StacApiIO(timeout=timeout, max_retries=3), # FIXME: add 'headers' with authorization/cookies? + collections=collection_id, + modifier=col_field_modifier, + **collection_args + ) + for item in search.items(): + for ctype in supported_media_types: + for _, asset in item.get_assets(media_type=ctype): # type: (..., Asset) + _, file_fmt = get_cwl_file_format(ctype) + file_obj = {"class": "File", "path": asset.href, "format": file_fmt} + resolved_files.append(file_obj) + else: + # allow resolution of the STAC feature itself if no specific asset media-type was matched + # (i.e.: accessing the collection as if it was a plain "OGC API - Features") + if ContentType.APP_GEOJSON in supported_media_types: + # already retrieved the contents, so might as well use them to avoid the extra request + # also, content is necessary to use the updated data in case 'properties' were injected + item_id = get_secure_filename(item.id) + path = os.path.join(output_dir, f"{item_id}.geojson") + with open(path, mode="w", encoding="utf-8") as file: + json.dump(item.to_dict(), file) + _, file_fmt = get_cwl_file_format(ContentType.APP_GEOJSON) + file_obj = {"class": "File", "path": f"file://{path}", "format": file_fmt} + resolved_files.append(file_obj) + + return resolved_files + + +def resolve_static_geojson_features_files( + collection_href, # type: str + collection_args, # type: Dict[str, JSON] + output_dir, # type: str +): # type: (...) -> List[CWL_IO_FileValue] + col_resp = request_extra( + "GET", + collection_href, + queries=collection_args, + headers={"Accept": f"{ContentType.APP_GEOJSON},{ContentType.APP_JSON}"}, + timeout=collection_args["timeout"], + retries=3, + only_server_errors=False, + ) + col_json = col_resp.json() + if not (col_resp.status_code == 200 and "features" in col_json): + raise ValueError(f"Could not parse [{collection_href}] as a GeoJSON FeatureCollection.") + + col_json = process_field_modifiers(col_json, **collection_args) + + resolved_files = [] + for i, feat in enumerate(col_json["features"]): + path = os.path.join(output_dir, f"feature-{i}.geojson") + with open(path, mode="w", encoding="utf-8") as file: + json.dump(feat, file) + _, file_fmt = get_cwl_file_format(ContentType.APP_GEOJSON) + file_obj = {"class": "File", "path": f"file://{path}", "format": file_fmt} + resolved_files.append(file_obj) + return resolved_files + + +def resolve_ogc_api_features_files( + api_url, # type: str + collection_id, # type: str + collection_args, # type: Dict[str, JSON] + supported_media_types, # type: List[str] + output_dir, # type: str +): # type: (...) -> List[CWL_IO_FileValue] + if str(collection_args.get("filter-lang")) == "cql2-json": + collection_args["cql"] = collection_args.pop("filter") + search = Features( + url=api_url, + # FIXME: add 'auth' or 'headers' authorization/cookies? + headers={"Accept": f"{ContentType.APP_GEOJSON}, {ContentType.APP_VDN_GEOJSON}, {ContentType.APP_JSON}"}, + ) + items = search.collection_items(collection_id, **collection_args) + if items.get("type") != "FeatureCollection" or "features" not in items: + raise ValueError( + f"Collection [{api_url}/collections/{collection_id}] using " + f"format [{ExecuteCollectionFormat.OGC_FEATURES}] did not return a GeoJSON FeatureCollection." + ) + + resolved_files = [] + for i, feat in enumerate(items["features"]): + # NOTE: + # since STAC is technically OGC API - Features compliant, both can be used interchangeably + # if media-types are non-GeoJSON and happen to contain STAC Assets, handle it as STAC transparently + if "assets" in feat and supported_media_types != [ContentType.APP_GEOJSON]: + for _, asset in feat["assets"].items(): # type: (str, JSON) + if asset["type"] in supported_media_types: + _, file_fmt = get_cwl_file_format(asset["type"]) + file_obj = {"class": "File", "path": asset["href"], "format": file_fmt} + resolved_files.append(file_obj) + else: + path = os.path.join(output_dir, f"feature-{i}.geojson") + with open(path, mode="w", encoding="utf-8") as file: + json.dump(feat, file) + _, file_fmt = get_cwl_file_format(ContentType.APP_GEOJSON) + file_obj = {"class": "File", "path": f"file://{path}", "format": file_fmt} + resolved_files.append(file_obj) + return resolved_files + + +def resolve_ogc_api_coverages_files( + api_url, # type: str + collection_id, # type: str + collection_args, # type: Dict[str, JSON] + supported_media_types, # type: List[str] + output_dir, # type: str +): # type: (...) -> List[CWL_IO_FileValue] + cov = Coverages( + url=api_url, + # FIXME: add 'auth' or 'headers' authorization/cookies? + headers={"Accept": ContentType.APP_JSON}, + ) + ctype = (supported_media_types or [ContentType.IMAGE_COG])[0] + ext = get_extension(ctype, dot=False) + path = os.path.join(output_dir, f"map.{ext}") + with open(path, mode="wb") as file: + data = cast(io.BytesIO, cov.coverage(collection_id)).getbuffer() + file.write(data) # type: ignore + _, file_fmt = get_cwl_file_format(ctype) + file_obj = {"class": "File", "path": f"file://{path}", "format": file_fmt} + return [file_obj] + + +def resolve_ogc_api_maps_files( + api_url, # type: str + collection_id, # type: str + collection_args, # type: Dict[str, JSON] + supported_media_types, # type: List[str] + output_dir, # type: str +): # type: (...) -> List[CWL_IO_FileValue] + maps = Maps( + url=api_url, + # FIXME: add 'auth' or 'headers' authorization/cookies? + headers={"Accept": ContentType.APP_JSON}, + ) + ctype = (supported_media_types or [ContentType.IMAGE_COG])[0] + ext = get_extension(ctype[0], dot=False) + path = os.path.join(output_dir, f"map.{ext}") + with open(path, mode="wb") as file: + data = cast(io.BytesIO, maps.map(collection_id)).getbuffer() + file.write(data) # type: ignore + _, file_fmt = get_cwl_file_format(ctype) + file_obj = {"class": "File", "path": f"file://{path}", "format": file_fmt} + return [file_obj] + + def process_cwl(collection_input, input_definition, output_dir): # type: (JobValueCollection, ProcessInputOutputItem, Path) -> CWL_IO_ValueMap files = process_collection(collection_input, input_definition, output_dir) diff --git a/weaver/processes/builtin/field_modifier_processor.py b/weaver/processes/builtin/field_modifier_processor.py index acd6a2e96..448ec4ead 100644 --- a/weaver/processes/builtin/field_modifier_processor.py +++ b/weaver/processes/builtin/field_modifier_processor.py @@ -2,6 +2,7 @@ """ Generates properties contents using the specified input definitions. """ +import abc import argparse import functools import json @@ -34,10 +35,10 @@ # pylint: disable=C0413,wrong-import-order from weaver.formats import ContentType, get_cwl_file_format # isort:skip # noqa: E402 from weaver.processes.builtin.utils import get_package_details # isort:skip # noqa: E402) -from weaver.utils import Lazify, load_file, repr_json # isort:skip # noqa: E402 +from weaver.utils import Lazify, fully_qualified_name, load_file, repr_json # isort:skip # noqa: E402 if TYPE_CHECKING: - from typing import Any, Callable, Dict, TypeAlias + from typing import Callable, Dict, List, Optional, Tuple, TypeAlias, TypeVar, Union from weaver.typedefs import ( CWL_IO_ValueMap, @@ -52,8 +53,9 @@ ) from weaver.utils import LoggerHandler - PropertyGetter: TypeAlias = Callable[[object, str], Any] - PropertySetter: TypeAlias = Callable[[object, str, Any], None] + PropertyValue = TypeVar("PropertyValue") + PropertyVariable = Union[PropertyValue, Number] + PropertyGetter: TypeAlias = Callable[[str], PropertyVariable] PACKAGE_NAME, PACKAGE_BASE, PACKAGE_MODULE = get_package_details(__file__) @@ -70,6 +72,84 @@ OUTPUT_CWL_JSON = "cwl.output.json" +class PropertyHandler(abc.ABC): + def __init__(self, instance): + self.instance = instance + + @abc.abstractmethod + def __getattr__(self, property_name): + # type: (str) -> PropertyValue + raise NotImplementedError + + @abc.abstractmethod + def __setattr__(self, property_name, value): + # type: (str, PropertyValue) -> None + raise NotImplementedError + + @abc.abstractmethod + def __delattr__(self, property_name): + # type: (str) -> None + raise NotImplementedError + + @abc.abstractmethod + def __iter__(self): + # type: () -> List[PropertyValue] + raise NotImplementedError + + +class DictHandler(dict, PropertyHandler): + """ + Operates on dictionary properties directly by key names. + """ + def __getattr__(self, property_name): + return dict.__getitem__(self, property_name) + + +class NestedDictHandler(dict, PropertyHandler): + """ + Operates on dictionary properties with nested key names separated by dots (``.``). + + .. code-block:: python + + handler = NestedDictHandler({"A": {"B": 123}}) + handler.__getattr__("A.B") # returns 123 + handler.__setattr__("A.B", 456) # updates {"A": {"B": 456}} + """ + def _get_parent_nested(self, property_name): + # type: (str) -> Tuple[Optional[Union[Dict[str, JSON]]], Optional[Union[str, int]]] + props = property_name.split(".") + data = self + for idx, prop in enumerate(props): + if isinstance(data, dict): + data = data.get(prop) + elif isinstance(data, list) and str.isnumeric(prop): + data = data[int(prop)] + else: + raise ValueError(f"Invalid property accessor unresolved: {property_name}") + if idx >= len(props) - 2: + if data is None: + break + last = props[-1] + last = int(last) if str.isnumeric(last) else last + return data, last + return None, None + + def __getattr__(self, property_name): + data, last = self._get_parent_nested(property_name) + if last is not None: + return data[last] + + def __setattr__(self, property_name, value): + data, last = self._get_parent_nested(property_name) + if last is not None: + data[last] = value + + def __delattr__(self, property_name): + data, last = self._get_parent_nested(property_name) + if last is not None: + del data[last] + + @functools.lru_cache(1024) def create_expression_parser(): # pylint: disable=R1260,too-complex # type: () -> ParserElement @@ -94,11 +174,12 @@ def __init__(self, tokens): self.value = tokens[0] def eval(self, variables, getter): - # type: (Dict[str, Number], PropertyGetter) -> Number + # type: (Dict[str, Union[PropertyValue, Number]], PropertyGetter) -> Union[PropertyValue, Number] if self.value in variables: - return getter(variables, self.value) - else: + return variables[self.value] + elif str(self.value).isnumeric(): return float(self.value) + return getter(self.value) class EvalSignOp: """ @@ -109,7 +190,7 @@ def __init__(self, tokens): self.sign, self.value = tokens[0] def eval(self, variables, getter): - # type: (Dict[str, Number], PropertyGetter) -> Number + # type: (Dict[str, PropertyVariable], PropertyGetter) -> PropertyVariable mult = {"+": 1, "-": -1}[self.sign] return mult * self.value.eval(variables, getter) @@ -133,7 +214,7 @@ def __init__(self, tokens): self.value = tokens[0] def eval(self, variables, getter): - # type: (Dict[str, Number], PropertyGetter) -> Number + # type: (Dict[str, PropertyVariable], PropertyGetter) -> PropertyVariable res = self.value[-1].eval(variables, getter) for val in self.value[-3::-2]: res = val.eval(variables, getter) ** res @@ -148,7 +229,7 @@ def __init__(self, tokens): self.value = tokens[0] def eval(self, variables, getter): - # type: (Dict[str, Number], PropertyGetter) -> Number + # type: (Dict[str, PropertyVariable], PropertyGetter) -> PropertyVariable prod = self.value[0].eval(variables, getter) for op, val in operator_operands(self.value[1:]): if op == "*": @@ -166,7 +247,7 @@ def __init__(self, tokens): self.value = tokens[0] def eval(self, variables, getter): - # type: (Dict[str, Number], PropertyGetter) -> Number + # type: (Dict[str, PropertyVariable], PropertyGetter) -> PropertyVariable _sum = self.value[0].eval(variables, getter) for op, val in operator_operands(self.value[1:]): if op == "+": @@ -199,7 +280,7 @@ def __init__(self, tokens): self.value = tokens[0] def eval(self, variables, getter): - # type: (Dict[str, Number], PropertyGetter) -> Number + # type: (Dict[str, PropertyVariable], PropertyGetter) -> PropertyVariable val1 = self.value[0].eval(variables, getter) for op, val in operator_operands(self.value[1:]): fn = EvalComparisonOp.opMap[op] @@ -247,40 +328,33 @@ def eval(self, variables, getter): def evaluate_property( properties, # type: Dict[str, JSON] - property_name, # type: str property_expression, # type: str - property_getter=None, # type: PropertyGetter - property_setter=None, # type: PropertySetter -): # type: (...) -> None + property_handler, # type: PropertyHandler +): # type: (...) -> PropertyVariable """ - Evaluates the applicable property expression with variable retrieval and insertion in the destination object. + Evaluates the applicable property expression with variable retrieval from reference data using the specifed handler. :param properties: Mapping of available property variables and expressions. - :param property_name: Target property to evaluate. :param property_expression: Calculation to be evaluated for the property, possibly referring to other properties. - :param property_getter: Function that knows how to retrieve a variable property from the properties' destination. - :param property_setter: Function that knows how to insert the evaluated property into the properties' destination. + :param property_handler: Implementation that knows how to manipulate property access from the data. """ - property_getter = property_getter or dict.__getitem__ - property_setter = property_setter or dict.__setitem__ - - expr = create_expression_parser() - result = expr.parse_string(property_expression)[0].eval(properties, getter=property_getter) - property_setter(properties, property_name, result) + parser = create_expression_parser() + expr = parser.parse_string(property_expression)[0] + result = expr.eval(properties, getter=property_handler.__getattr__) + return result def process_field_modifiers( - values, # type: Dict[str, JSON] + values, # type: PropertyValue *, # force named keyword arguments after filter_expr=None, # type: FieldModifierFilterExpression, filter_crs=None, # type: FieldModifierFilterCRS, filter_lang=None, # type: FieldModifierFilterLang, properties=None, # type: FieldModifierProperties, - property_getter=None, # type: PropertyGetter - property_setter=None, # type: PropertySetter + property_handler=None, # type: PropertyHandler sortby=None, # type: FieldModifierSortBy, logger=LOGGER, # type: LoggerHandler -): # type: (...) -> JSON +): # type: (...) -> Dict[str, PropertyValue] """ Processor of field modifiers for an input or output. @@ -294,8 +368,7 @@ def process_field_modifiers( :param filter_lang: Filter language to interpret the filter expression. :param filter_crs: Filter Coordinate Reference System (CRS) to employ with the filter expression. :param properties: Properties definition submitted to the process and to be generated from input values. - :param property_getter: Function that knows how to retrieve a variable property from the properties' destination. - :param property_setter: Function that knows how to insert the evaluated property into the properties' destination. + :param property_handler: Implementation that knows how to manipulate property access for the data destination. :param sortby: Sorting definition with relevant field names and ordering direction. :param logger: Optional logger handler to employ. :return: File reference containing the resolved properties. @@ -308,6 +381,7 @@ def process_field_modifiers( " filter_crs=%s\n" " filter_lang=%s\n" " properties=%s\n" + " property_handler=%s\n" " sortby=%s\n" " values=%s" ), @@ -316,33 +390,63 @@ def process_field_modifiers( Lazify(lambda: repr_json(filter_crs, indent=2)), Lazify(lambda: repr_json(filter_lang, indent=2)), Lazify(lambda: repr_json(properties, indent=2)), + Lazify(lambda: fully_qualified_name(property_handler) if property_handler else None), Lazify(lambda: repr_json(sortby, indent=2)), Lazify(lambda: repr_json(values, indent=2)), ) properties = properties or {} - # sort properties later if they depend on other ones, the least dependencies to be computed first - props_deps = {prop: 0 for prop in properties} - for prop, calc in properties.items(): - for prop_dep in props_deps: - if prop == prop_dep: - if prop in calc: - raise ValueError(f"Invalid recursive property [{prop}] references itself.") - continue - if prop_dep in calc: - props_deps[prop_dep] += 1 - if not filter(lambda dep: dep[-1] == 0, props_deps.items()): - raise ValueError("Invalid properties all depend on another one. Impossible resolution order.") - props = sorted( - list(properties.items()), - key=lambda p: props_deps[p[0]] - ) - - # compute the properties - for prop, calc in props: - evaluate_property(properties, prop, calc, property_getter=property_getter, property_setter=property_setter) - - # FIXME: handle filtering and sorting + # if unspecified, consider the values as dict with handler + # (values will be set/retrieved directly by literal keys) + # alternate value handler need to be specified explicitly + if not property_handler: + property_handler = values + + # FIXME: handle filtering + # define a meta-set operation wrapping the property handler ? + # eg: for each 'feature', do 'properties', but 'filter'/'sortby' over entire set of 'features' + + # properties are a filtering subset + if isinstance(properties, list): + available_properties = set(property_handler) + requested_properties = set(properties) + for prop in available_properties - requested_properties: + del property_handler[prop] + + # otherwise, properties are expressions + else: + # sort properties later if they depend on other ones, the least dependencies to be computed first + props_deps = {prop: 0 for prop in properties} + for prop, calc in properties.items(): + for prop_dep in props_deps: + if prop == prop_dep: + if prop in str(calc): + raise ValueError(f"Invalid recursive property [{prop}] references itself.") + continue + if prop_dep in str(calc): + props_deps[prop_dep] += 1 + if not filter(lambda dep: dep[-1] == 0, props_deps.items()): + raise ValueError("Invalid properties all depend on another one. Impossible resolution order.") + props = sorted( + list(properties.items()), + key=lambda p: props_deps[p[0]], + reverse=True, + ) + + # compute the properties + for prop, calc in props: + result = evaluate_property( + properties, + str(calc), + property_handler=property_handler, + ) + # make resolved values available for next iterations + # the 'properties' become a temporary variable buffer for cross-reference + # the 'property_handler' decides if that result should be inserted or not + properties[prop] = result + property_handler.__setattr__(prop, result) + + # FIXME: handle sorting - over set of objects containing 'properties' return properties diff --git a/weaver/typedefs.py b/weaver/typedefs.py index 6c97a0949..bff3b7e63 100644 --- a/weaver/typedefs.py +++ b/weaver/typedefs.py @@ -436,7 +436,9 @@ class CWL_SchemaName(Protocol): ] FieldModifierFilterCRS = str FieldModifierFilterLang = str - FieldModifierProperties = Dict[str, str] + FieldModifierPropertiesFilter = List[str] + FieldModifierPropertiesExpr = Dict[str, str] + FieldModifierProperties = Union[FieldModifierPropertiesExpr, FieldModifierPropertiesFilter] FieldModifierSortByItem = TypedDict("FieldModifierSortByItem", { "field": str, "direction": Literal["asc", "desc"], diff --git a/weaver/wps_restapi/swagger_definitions.py b/weaver/wps_restapi/swagger_definitions.py index 6f831e51f..e3843b71c 100644 --- a/weaver/wps_restapi/swagger_definitions.py +++ b/weaver/wps_restapi/swagger_definitions.py @@ -1608,25 +1608,52 @@ def deserialize(self, cstruct): return result +class AnyPropertyExpression(AnyOfKeywordSchema): + _any_of = [ + PermissiveMappingSchema(validator=Length(min=1)), + ExtendedSchemaNode(Float()), + ExtendedSchemaNode(Integer()), + ExtendedSchemaNode(String(), validator=Length(min=1)), + ] + + +class PropertyFiltering(ExtendedSequenceSchema): + description = "Properties to filter from available data." + prop = ExtendedSchemaNode(String(), validator=Length(min=1)) + validator = Length(min=1) + + class PropertiesExpression(ExtendedMappingSchema): - prop = AnyFilterExpression( + description = "Properties to compute from available data." + prop = AnyPropertyExpression( variable="{property}", description="Expression that defines how to compute the property.", - validator=Length(min=1), ) + validator = Length(min=1) + + +class PropertiesDefinition(OneOfKeywordSchema): + summary = "Properties to retrieve or compute." + description = "Properties produced from the data in the context of where they are specified." + _one_of = [ + PropertyFiltering(), + PropertiesExpression(), + ] class PropertiesSchema(ExtendedMappingSchema): - properties = PropertiesExpression(missing=drop, validator=Length(min=1)) + properties = PropertiesDefinition(missing=drop) def deserialize(self, cstruct): result = super().deserialize(cstruct) - if "properties" in cstruct and "properties" not in result: - raise colander.Invalid( - node=self, - msg="Invalid properties expression could not be interpreted.", - value={"properties": repr_json(cstruct["properties"])}, - ) + if "properties" in cstruct: + props = (result or {}).get("properties") or {} + if cstruct["properties"] != props: + raise colander.Invalid( + node=self, + msg="Invalid properties definition could not be interpreted.", + value={"properties": repr_json(cstruct["properties"])}, + ) return result From 8e4ebed43ff100166c8a6ca88631f5b1b1d98512 Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Fri, 15 Nov 2024 22:14:40 -0500 Subject: [PATCH 11/11] fix properties schema deserialization --- weaver/wps_restapi/swagger_definitions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/weaver/wps_restapi/swagger_definitions.py b/weaver/wps_restapi/swagger_definitions.py index b211c0226..119a19be5 100644 --- a/weaver/wps_restapi/swagger_definitions.py +++ b/weaver/wps_restapi/swagger_definitions.py @@ -1647,7 +1647,7 @@ class PropertiesSchema(ExtendedMappingSchema): def deserialize(self, cstruct): result = super().deserialize(cstruct) if "properties" in cstruct: - props = (result or {}).get("properties") or {} + props = (result or {}).get("properties") if cstruct["properties"] != props: raise colander.Invalid( node=self,