diff --git a/requirements.txt b/requirements.txt index 1702261d4..4e3a4eb9e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -90,6 +90,7 @@ pygeofilter[fes,backend-native] # - https://github.com/celery/kombu/pull/1536 # - https://github.com/celery/celery/pull/7834 pymongo>=4.6.3 # either (pymongo>=4, kombu>=5.3.0b2) or (pymongo<4, celery<5.2) +pyparsing pyramid>=1.7.3 pyramid_beaker>=0.8 # see https://github.com/sontek/pyramid_celery/pull/102 to fix Python 3.12 support and other improvements diff --git a/tests/functional/test_builtin.py b/tests/functional/test_builtin.py index dd41a5270..23640eed7 100644 --- a/tests/functional/test_builtin.py +++ b/tests/functional/test_builtin.py @@ -20,7 +20,13 @@ ) from weaver.execute import ExecuteControlOption, ExecuteMode, ExecuteResponse, ExecuteTransmissionMode from weaver.formats import ContentEncoding, ContentType, get_format, repr_json -from weaver.processes.builtin import file_index_selector, jsonarray2netcdf, metalink2netcdf, register_builtin_processes +from weaver.processes.builtin import ( + field_modifier_processor, + file_index_selector, + jsonarray2netcdf, + metalink2netcdf, + register_builtin_processes +) from weaver.processes.constants import JobInputsOutputsSchema from weaver.status import Status from weaver.utils import create_metalink, fully_qualified_name, get_path_kvp @@ -1099,3 +1105,11 @@ def test_file_index_selector_invalid_out_dir(): with pytest.raises(ValueError) as err: file_index_selector.main("-f", "", "-i", "1", "-o", tmp_out_dir) assert "does not exist" in str(err.value) + + +def test_field_modifier_processor_expression_variables(): + expr = field_modifier_processor.create_expression_parser() + calc = "properties.eo:cloud_cover + 1" + var = {"properties.eo:cloud_cover": 2} + val = expr.parse_string(calc)[0].eval(var, dict.__getitem__) + assert val == 3.0 diff --git a/tests/functional/test_wps_package.py b/tests/functional/test_wps_package.py index f3d59e207..c500579d4 100644 --- a/tests/functional/test_wps_package.py +++ b/tests/functional/test_wps_package.py @@ -79,7 +79,7 @@ from weaver.wps_restapi import swagger_definitions as sd if TYPE_CHECKING: - from typing import List + from typing import Dict, List from responses import RequestsMock @@ -1185,6 +1185,7 @@ def test_deploy_block_builtin_processes_from_api(self): process = self.process_store.fetch_by_id(self._testMethodName) assert process.type == ProcessType.APPLICATION + @pytest.mark.oap_part2 @parameterized.expand([ # not allowed even if combined with another known and valid definition ({"UnknownRequirement": {}, CWL_REQUIREMENT_APP_DOCKER: {"dockerPull": "python:3.7-alpine"}}, ), @@ -1239,6 +1240,7 @@ def test_deploy_requirement_inline_javascript(self): _, cwl = self.deploy_process(body, describe_schema=ProcessSchema.OGC) assert CWL_REQUIREMENT_INLINE_JAVASCRIPT in cwl["requirements"] + @pytest.mark.oap_part2 def test_deploy_merge_complex_io_with_multiple_formats_and_defaults(self): """ Test validates that different format types are set on different input variations simultaneously: @@ -1586,6 +1588,7 @@ def test_deploy_merge_complex_io_with_multiple_formats_and_defaults(self): assert pkg["outputs"][3]["type"] == {"type": "array", "items": "File"} assert "format" not in pkg["outputs"][3], "CWL format array not allowed for outputs." + @pytest.mark.oap_part2 def test_deploy_merge_resolution_io_min_max_occurs(self): """ Test validates that various merging/resolution strategies of I/O definitions are properly applied for @@ -1755,6 +1758,7 @@ def test_deploy_merge_resolution_io_min_max_occurs(self): assert pkg["inputs"][13]["id"] == "optional_array_max_fixed_by_wps" # assert pkg["inputs"][13]["type"] == "string[]?" + @pytest.mark.oap_part2 def test_deploy_merge_valid_io_min_max_occurs_as_str_or_int(self): """ Test validates that I/O definitions with ``minOccurs`` and/or ``maxOccurs`` are permitted as both integer and @@ -1935,6 +1939,7 @@ def test_execute_job_with_accept_languages(self): "Expected error description to indicate bad language" ) + @pytest.mark.oap_part1 @mocked_aws_config @mocked_aws_s3 @mocked_http_file @@ -2111,6 +2116,7 @@ def tmp(value): assert processed_values["test_reference_http_value"] == "THIS IS A GENERATED FILE FOR HTTP TEST" assert processed_values["test_reference_file_value"] == "THIS IS A GENERATED FILE FOR FILE TEST" + @pytest.mark.oap_part1 def test_execute_job_with_inline_input_values(self): """ Validates that the job can receive an object and array types inputs and process them as expected. @@ -2246,6 +2252,7 @@ def test_execute_job_with_inline_input_values(self): assert processed_values["measureFloatInput"] == 10.2 assert processed_values["measureFileInput"] == {"VALUE": {"REF": 1, "MEASUREMENT": 10.3, "UOM": "M"}} + @pytest.mark.oap_part1 def test_execute_job_with_bbox(self): body = self.retrieve_payload("EchoBoundingBox", "deploy", local=True) proc = self.fully_qualified_test_name(self._testMethodName) @@ -2281,7 +2288,13 @@ def test_execute_job_with_bbox(self): "Expected the BBOX CRS URI to be interpreted and validated by known WPS definitions." ) + @pytest.mark.oap_part3 def test_execute_job_with_collection_input_geojson_feature_collection(self): + """ + Validate parsing and handling of ``collection`` specified in an input reference to a :term:`GeoJSON` file. + + .. versionadded:: 5.8 + """ name = "EchoFeatures" body = self.retrieve_payload(name, "deploy", local=True) proc = self.fully_qualified_test_name(self._testMethodName) @@ -2331,12 +2344,18 @@ def test_execute_job_with_collection_input_geojson_feature_collection(self): out_data = json.load(out_fd) assert out_data["features"] == col_feats["features"] + @pytest.mark.oap_part3 @parameterized.expand([ # note: the following are not *actually* filtering, but just validating formats are respected across code paths ("POST", "cql2-json", {"op": "=", "args": [{"property": "name"}, "test"]}), ("GET", "cql2-text", "property.name = 'test'"), ]) def test_execute_job_with_collection_input_ogc_features(self, filter_method, filter_lang, filter_value): + """ + Validate parsing and handling of ``collection`` specified in an input with an `OGC API - Features` reference. + + .. versionadded:: 5.8 + """ name = "EchoFeatures" body = self.retrieve_payload(name, "deploy", local=True) proc = self.fully_qualified_test_name(self._testMethodName) @@ -2394,6 +2413,202 @@ def test_execute_job_with_collection_input_ogc_features(self, filter_method, fil out_data = json.load(out_fd) assert out_data["features"] == col_feats["features"] + @pytest.mark.oap_part3 + def test_execute_job_with_collection_input_stac_as_features_with_properties(self): + """ + Validate parsing and handling of ``collection`` specified in an input with :term:`STAC` :term:`API` endpoint. + + .. versionadded:: 5.8 + .. versionchanged:: 6.0 + Evaluate the dynamic insertion of a ``properties`` definition into the retrieved collection features. + """ + name = "EchoFeatures" + body = self.retrieve_payload(name, "deploy", local=True) + proc = self.fully_qualified_test_name(self._testMethodName) + self.deploy_process(body, describe_schema=ProcessSchema.OGC, process_id=proc) + + with contextlib.ExitStack() as stack: + tmp_host = "https://mocked-file-server.com" # must match collection prefix hostnames + tmp_dir = stack.enter_context(tempfile.TemporaryDirectory()) # pylint: disable=R1732 + tmp_svr = stack.enter_context( + mocked_file_server(tmp_dir, tmp_host, settings=self.settings, mock_browse_index=True) + ) + exec_body_val = self.retrieve_payload(name, "execute", local=True) + col_feats = exec_body_val["inputs"]["features"]["value"] # type: JSON + + # patch the original content to make it respect STAC validation + col_id = "test" + for idx, feat in enumerate(col_feats["features"]): + feat.update({ + "stac_version": "1.0.0", + "stac_extensions": [], + "collection": col_id, + "id": f"{col_id}-{idx}", + "assets": {}, + }) + feat["properties"].update({ + "datetime": "2024-01-01T00:00:00Z", + }) + + # setup input collection properties + var_A = 3 + var_B = 123 + var_C = -63 + var_data = 10 + expect_result = (var_A * (var_B + var_C) / var_data) ** 2 # = 324 + col_feats["features"][0]["properties"]["data"] = var_data + col_feats_src = copy.deepcopy(col_feats["features"][0]) # for later validation + field_props = { + "properties.result": "(A * (B + C) / properties.data) ^ 2", + # intermediate variables, should not be set + # use random order to validate resolution priority + "B": var_B, + "C": var_C, + "A": var_A, + } + forbidden_vars = set(field_props) - {"properties.result"} + filter_lang = "cql2-json" + filter_value = {"op": "=", "args": [{"property": "name"}, "test"]} + search_body = { + # note: 'properties' are not in search request, separate post-operation + "collections": [col_id], + "filter": filter_value, + "filter-lang": filter_lang, + } + search_match = responses.matchers.json_params_matcher(search_body) + tmp_svr.add("POST", f"{tmp_host}/search", json=col_feats, match=[search_match]) + + col_exec_body = { + "mode": ExecuteMode.ASYNC, + "response": ExecuteResponse.DOCUMENT, + "inputs": { + "features": { + "collection": f"{tmp_host}/collections/{col_id}", + "format": ExecuteCollectionFormat.STAC, + "type": ContentType.APP_GEOJSON, + "filter-lang": filter_lang, + "filter": filter_value, + "properties": field_props, + } + } + } + + for mock_exec in mocked_execute_celery(): + stack.enter_context(mock_exec) + proc_url = f"/processes/{proc}/execution" + resp = mocked_sub_requests(self.app, "post_json", proc_url, timeout=5, + data=col_exec_body, headers=self.json_headers, only_local=True) + assert resp.status_code in [200, 201], f"Failed with: [{resp.status_code}]\nReason:\n{resp.json}" + + status_url = resp.json["location"] + results = self.monitor_job(status_url) + assert "features" in results + + job_id = status_url.rsplit("/", 1)[-1] + wps_dir = get_wps_output_dir(self.settings) + job_dir = os.path.join(wps_dir, job_id) + job_out = os.path.join(job_dir, "features", "features.geojson") + assert os.path.isfile(job_out), f"Invalid output file not found: [{job_out}]" + with open(job_out, mode="r", encoding="utf-8") as out_fd: + out_data = json.load(out_fd) + + assert "features" in out_data and isinstance(out_data["features"], list) + assert len(out_data["features"]) == 1 + assert all("properties" in feat for feat in out_data["features"]) + + for feat_src, feat_out in zip(col_feats["features"], out_data["features"]): + assert feat_src["properties"] == col_feats_src["properties"], "source data should not be modified" + assert feat_out["properties"] != col_feats_src["properties"], "result data should have been modified" + + # validate multiple locations to ensure insertion happened + # where expected and omitted where disallowed (i.e.: variables) + assert "properties.result" not in feat_out + assert "properties.result" not in feat_out["properties"] + assert "result" in feat_out["properties"] + assert feat_out["properties"]["result"] == expect_result + for var in forbidden_vars: + assert var not in feat_out + assert var not in feat_out["properties"] + + @pytest.mark.oap_part3 + def test_execute_job_with_nested_process_properties_field_modifier(self): + """ + Validate parsing and handling of ``properties`` specified for a nested process execution. + + .. versionadded:: 6.0 + """ + name = "EchoFeatures" + body = self.retrieve_payload(name, "deploy", local=True) + proc = self.fully_qualified_test_name(self._testMethodName) + self.deploy_process(body, describe_schema=ProcessSchema.OGC, process_id=proc) + + with contextlib.ExitStack() as stack: + tmp_host = "https://mocked-file-server.com" # must match collection prefix hostnames + tmp_dir = stack.enter_context(tempfile.TemporaryDirectory()) # pylint: disable=R1732 + tmp_svr = stack.enter_context( + mocked_file_server(tmp_dir, tmp_host, settings=self.settings, mock_browse_index=True) + ) + + exec_body_val = self.retrieve_payload(name, "execute", local=True) + col_file = os.path.join(tmp_dir, "test.json") + col_feats = exec_body_val["inputs"]["features"]["value"] # type: JSON + with open(col_file, mode="w", encoding="utf-8") as tmp_feature_collection_geojson: + json.dump(col_feats, tmp_feature_collection_geojson) + + filter_value = { + "filter": "property.name = 'test'" + } + filter_match = responses.matchers.json_params_matcher(filter_value) + tmp_svr.add("POST", f"{tmp_host}/collections/test/items", json=col_feats, match=[filter_match]) + + proc_url = f"/processes/{proc}" + exec_url = f"{proc_url}/execution" + col_exec_body = { + "mode": ExecuteMode.ASYNC, + "response": ExecuteResponse.DOCUMENT, + "process": proc_url, + "inputs": { + "features": { + "process": proc_url, + "inputs": { + "features": { + # accessed directly as a static GeoJSON FeatureCollection + "collection": "https://mocked-file-server.com/test.json", + "format": ExecuteCollectionFormat.GEOJSON, + "schema": "http://www.opengis.net/def/glossary/term/FeatureCollection", + "filter": "property.name = 'test'", + "properties": { + "nested": "1 / 2" + } + } + }, + "properties": { + "extra": 3.1416 + } + } + } + } + + for mock_exec in mocked_execute_celery(): + stack.enter_context(mock_exec) + resp = mocked_sub_requests(self.app, "post_json", exec_url, timeout=5, + data=col_exec_body, headers=self.json_headers, only_local=True) + assert resp.status_code in [200, 201], f"Failed with: [{resp.status_code}]\nReason:\n{resp.json}" + + status_url = resp.json["location"] + results = self.monitor_job(status_url) + assert "features" in results + + job_id = status_url.rsplit("/", 1)[-1] + wps_dir = get_wps_output_dir(self.settings) + job_dir = os.path.join(wps_dir, job_id) + job_out = os.path.join(job_dir, "features", "features.geojson") + assert os.path.isfile(job_out), f"Invalid output file not found: [{job_out}]" + with open(job_out, mode="r", encoding="utf-8") as out_fd: + out_data = json.load(out_fd) + + assert out_data["features"] == col_feats["features"] + def test_execute_job_with_context_output_dir(self): cwl = { "cwlVersion": "v1.0", diff --git a/tests/functional/utils.py b/tests/functional/utils.py index b995d919e..6a372fc24 100644 --- a/tests/functional/utils.py +++ b/tests/functional/utils.py @@ -92,11 +92,8 @@ def request(cls, method, url, *args, **kwargs): def retrieve_payload(cls, process, # type: str ref_type=None, # type: Literal["deploy"] - ref_name=None, # type: Optional[str] - ref_found=False, # type: Literal[False] - location=None, # type: Optional[str] - local=False, # type: bool - ): # type: (...) -> Union[ProcessDeployment, Dict[str, JSON]] + **kwargs, # type: Any + ): # type: (...) -> ProcessDeployment ... @classmethod @@ -104,11 +101,8 @@ def retrieve_payload(cls, def retrieve_payload(cls, process, # type: str ref_type=None, # type: Literal["describe"] - ref_name=None, # type: Optional[str] - ref_found=False, # type: Literal[False] - location=None, # type: Optional[str] - local=False, # type: bool - ): # type: (...) -> Union[ProcessDescription, Dict[str, JSON]] + **kwargs, # type: Any + ): # type: (...) -> ProcessDescription ... @classmethod @@ -116,11 +110,8 @@ def retrieve_payload(cls, def retrieve_payload(cls, process, # type: str ref_type=None, # type: Literal["execute", "quotation"] - ref_name=None, # type: Optional[str] - ref_found=False, # type: Literal[False] - location=None, # type: Optional[str] - local=False, # type: bool - ): # type: (...) -> Union[ProcessExecution, Dict[str, JSON]] + **kwargs, # type: Any + ): # type: (...) -> ProcessExecution ... @classmethod @@ -128,10 +119,7 @@ def retrieve_payload(cls, def retrieve_payload(cls, process, # type: str ref_type=None, # type: Literal["package"] - ref_name=None, # type: Optional[str] - ref_found=False, # type: Literal[False] - location=None, # type: Optional[str] - local=False, # type: bool + **kwargs, # type: Any ): # type: (...) -> CWL ... @@ -140,10 +128,7 @@ def retrieve_payload(cls, def retrieve_payload(cls, process, # type: str ref_type=None, # type: Literal["estimator"] - ref_name=None, # type: Optional[str] - ref_found=False, # type: Literal[False] - location=None, # type: Optional[str] - local=False, # type: bool + **kwargs, # type: Any ): # type: (...) -> Dict[str, JSON] ... @@ -152,10 +137,7 @@ def retrieve_payload(cls, def retrieve_payload(cls, process, # type: str ref_type=None, # type: ReferenceType - ref_name=None, # type: Optional[str] - ref_found=False, # type: Literal[True] - location=None, # type: Optional[str] - local=False, # type: bool + **kwargs, # type: Any ): # type: (...) -> str ... @@ -378,10 +360,8 @@ def describe_process(cls, process_id, describe_schema=ProcessSchema.OGC): @overload def deploy_process(cls, payload, # type: JSON - process_id=None, # type: Optional[str] describe_schema=ProcessSchema.OGC, # type: ProcessSchemaOGCType - mock_requests_only_local=True, # type: bool - add_package_requirement=True, # type: bool + **kwargs, # type: Any ): # type: (...) -> Tuple[ProcessDescriptionMapping, CWL] ... @@ -389,10 +369,8 @@ def deploy_process(cls, @overload def deploy_process(cls, payload, # type: JSON - process_id=None, # type: Optional[str] describe_schema=ProcessSchema.OGC, # type: ProcessSchemaOLDType - mock_requests_only_local=True, # type: bool - add_package_requirement=True, # type: bool + **kwargs, # type: Any ): # type: (...) -> Tuple[ProcessDescriptionListing, CWL] ... diff --git a/tests/wps_restapi/test_swagger_definitions.py b/tests/wps_restapi/test_swagger_definitions.py index a986c4c9c..e968d3101 100644 --- a/tests/wps_restapi/test_swagger_definitions.py +++ b/tests/wps_restapi/test_swagger_definitions.py @@ -387,7 +387,19 @@ def test_collection_input_filter_lang_case_insensitive(): }, { "collection": "https://example.com/collections/test", - "sortBy": ["name"], + "sortBy": [123], + }, + { + "collection": "https://example.com/collections/test", + "properties": "ABC", + }, + { + "collection": "https://example.com/collections/test", + "properties": [], + }, + { + "collection": "https://example.com/collections/test", + "properties": {}, }, ] ) diff --git a/weaver/processes/builtin/collection_processor.py b/weaver/processes/builtin/collection_processor.py index 3fe2d3d8b..6d2902a84 100644 --- a/weaver/processes/builtin/collection_processor.py +++ b/weaver/processes/builtin/collection_processor.py @@ -31,16 +31,27 @@ get_cwl_file_format, get_extension ) +from weaver.processes.builtin.field_modifier_processor import ( # isort:skip # noqa: E402 + NestedDictHandler, + process_field_modifiers +) from weaver.processes.builtin.utils import ( # isort:skip # noqa: E402 get_package_details, is_geojson_url, validate_reference ) -from weaver.utils import Lazify, get_any_id, load_file, repr_json, request_extra # isort:skip # noqa: E402 +from weaver.utils import ( # isort:skip # noqa: E402 + Lazify, + get_any_id, + get_secure_filename, + load_file, + repr_json, + request_extra +) from weaver.wps_restapi import swagger_definitions as sd # isort:skip # noqa: E402 if TYPE_CHECKING: - from typing import List + from typing import Dict, List from pystac import Asset @@ -69,7 +80,7 @@ OUTPUT_CWL_JSON = "cwl.output.json" -def process_collection(collection_input, input_definition, output_dir, logger=LOGGER): # pylint: disable=R1260 +def process_collection(collection_input, input_definition, output_dir, logger=LOGGER): # type: (JobValueCollection, ProcessInputOutputItem, Path, LoggerHandler) -> List[CWL_IO_FileValue] """ Processor of a :term:`Collection`. @@ -89,7 +100,12 @@ def process_collection(collection_input, input_definition, output_dir, logger=LO input_id = get_any_id(input_definition) logger.log( # pylint: disable=E1205 # false positive logging.INFO, - "Process [{}] Got arguments: collection_input={} output_dir=[{}], for input=[{}]", + ( + "Process [%s] got arguments:\n" + " collection_input=%s\n" + " output_dir=[%s]\n" + " input=[%s]" + ), PACKAGE_NAME, Lazify(lambda: repr_json(collection_input, indent=2)), output_dir, @@ -137,115 +153,24 @@ def process_collection(collection_input, input_definition, output_dir, logger=LO ) resolved_files = [] if col_fmt == ExecuteCollectionFormat.GEOJSON: - # static GeoJSON FeatureCollection document - col_resp = request_extra( - "GET", - col_href, - queries=col_args, - headers={"Accept": f"{ContentType.APP_GEOJSON},{ContentType.APP_JSON}"}, - timeout=col_args["timeout"], - retries=3, - only_server_errors=False, - ) - col_json = col_resp.json() - if not (col_resp.status_code == 200 and "features" in col_json): - raise ValueError(f"Could not parse [{col_href}] as a GeoJSON FeatureCollection.") - - for i, feat in enumerate(col_json["features"]): - path = os.path.join(output_dir, f"feature-{i}.geojson") - with open(path, mode="w", encoding="utf-8") as file: - json.dump(feat, file) - _, file_fmt = get_cwl_file_format(ContentType.APP_GEOJSON) - file_obj = {"class": "File", "path": f"file://{path}", "format": file_fmt} - resolved_files.append(file_obj) + geojson_files = resolve_static_geojson_features_files(col_href, col_args, output_dir) + resolved_files.extend(geojson_files) elif col_fmt == ExecuteCollectionFormat.STAC: - # convert all parameters to their corresponding name of the query utility, and ignore unknown ones - for arg in list(col_args): - if "-" in arg: - col_args[arg.replace("-", "_")] = col_args.pop(arg) - known_params = set(inspect.signature(ItemSearch).parameters) - known_params -= {"url", "method", "stac_io", "client", "collection", "ids", "modifier"} - for param in set(col_args) - known_params: - col_args.pop(param) - - timeout = col_args.pop("timeout", 10) - search = ItemSearch( - url=api_url, - method="POST", - stac_io=StacApiIO(timeout=timeout, max_retries=3), # FIXME: add 'headers' with authorization/cookies? - collections=col_id, - **col_args - ) - for item in search.items(): - for ctype in col_media_type: - for _, asset in item.get_assets(media_type=ctype): # type: (..., Asset) - _, file_fmt = get_cwl_file_format(ctype) - file_obj = {"class": "File", "path": asset.href, "format": file_fmt} - resolved_files.append(file_obj) + stac_files = resolve_stac_collection_files(api_url, col_id, col_args, col_media_type, output_dir) + resolved_files.extend(stac_files) elif col_fmt == ExecuteCollectionFormat.OGC_FEATURES: - if str(col_args.get("filter-lang")) == "cql2-json": - col_args["cql"] = col_args.pop("filter") - search = Features( - url=api_url, - # FIXME: add 'auth' or 'headers' authorization/cookies? - headers={"Accept": f"{ContentType.APP_GEOJSON}, {ContentType.APP_VDN_GEOJSON}, {ContentType.APP_JSON}"}, - ) - items = search.collection_items(col_id, **col_args) - if items.get("type") != "FeatureCollection" or "features" not in items: - raise ValueError( - f"Collection [{col_href}] using format [{col_fmt}] did not return a GeoJSON FeatureCollection." - ) - for i, feat in enumerate(items["features"]): - # NOTE: - # since STAC is technically OGC API - Features compliant, both can be used interchangeably - # if media-types are non-GeoJSON and happen to contain STAC Assets, handle it as STAC transparently - if "assets" in feat and col_media_type != [ContentType.APP_GEOJSON]: - for _, asset in feat["assets"].items(): # type: (str, JSON) - if asset["type"] in col_media_type: - _, file_fmt = get_cwl_file_format(asset["type"]) - file_obj = {"class": "File", "path": asset["href"], "format": file_fmt} - resolved_files.append(file_obj) - else: - path = os.path.join(output_dir, f"feature-{i}.geojson") - with open(path, mode="w", encoding="utf-8") as file: - json.dump(feat, file) - _, file_fmt = get_cwl_file_format(ContentType.APP_GEOJSON) - file_obj = {"class": "File", "path": f"file://{path}", "format": file_fmt} - resolved_files.append(file_obj) + feat_files = resolve_ogc_api_features_files(api_url, col_id, col_args, col_media_type, output_dir) + resolved_files.extend(feat_files) elif col_fmt == ExecuteCollectionFormat.OGC_COVERAGE: - cov = Coverages( - url=api_url, - # FIXME: add 'auth' or 'headers' authorization/cookies? - headers={"Accept": ContentType.APP_JSON}, - ) - ctype = (col_media_type or [ContentType.IMAGE_COG])[0] - ext = get_extension(ctype, dot=False) - path = os.path.join(output_dir, f"map.{ext}") - with open(path, mode="wb") as file: - data = cast(io.BytesIO, cov.coverage(col_id)).getbuffer() - file.write(data) # type: ignore - _, file_fmt = get_cwl_file_format(ctype) - file_obj = {"class": "File", "path": f"file://{path}", "format": file_fmt} - resolved_files.append(file_obj) + cov_files = resolve_ogc_api_coverages_files(api_url, col_id, col_args, col_media_type, output_dir) + resolved_files.extend(cov_files) elif col_fmt in ExecuteCollectionFormat.OGC_MAP: - maps = Maps( - url=api_url, - # FIXME: add 'auth' or 'headers' authorization/cookies? - headers={"Accept": ContentType.APP_JSON}, - ) - ctype = (col_media_type or [ContentType.IMAGE_COG])[0] - ext = get_extension(ctype[0], dot=False) - path = os.path.join(output_dir, f"map.{ext}") - with open(path, mode="wb") as file: - data = cast(io.BytesIO, maps.map(col_id)).getbuffer() - file.write(data) # type: ignore - _, file_fmt = get_cwl_file_format(ctype) - file_obj = {"class": "File", "path": f"file://{path}", "format": file_fmt} - resolved_files.append(file_obj) + maps_files = resolve_ogc_api_maps_files(api_url, col_id, col_args, col_media_type, output_dir) + resolved_files.extend(maps_files) else: raise ValueError(f"Collection [{col_href}] could not be resolved. Unknown format [{col_fmt}].") @@ -263,6 +188,189 @@ def process_collection(collection_input, input_definition, output_dir, logger=LO return resolved_files +def resolve_stac_collection_files( + api_url, # type: str + collection_id, # type: str + collection_args, # type: Dict[str, JSON] + supported_media_types, # type: List[str] + output_dir, # type: str +): # type: (...) -> List[CWL_IO_FileValue] + + # convert all parameters to their corresponding name of the query utility, and ignore unknown ones + for arg in list(collection_args): + if "-" in arg: + collection_args[arg.replace("-", "_")] = collection_args.pop(arg) + + timeout = collection_args.pop("timeout", 10) + col_properties = collection_args.pop("properties", None) # will become 'modifier' callable + + known_params = set(inspect.signature(ItemSearch).parameters) + known_params -= {"url", "method", "stac_io", "client", "collections", "ids"} + unknown_params = set(collection_args) - known_params + for param in unknown_params: + collection_args.pop(param) + + # STAC can perform filters and sorting server-side + # only perform the remaining properties modifier operations locally as needed + if col_properties: + def col_field_modifier(stac_feat_col): + for _feat in stac_feat_col.get("features", []): + handler = NestedDictHandler(_feat) + process_field_modifiers(_feat, properties=col_properties, property_handler=handler) + else: + col_field_modifier = None + + resolved_files = [] + search = ItemSearch( + url=f"{api_url}/search", + method="POST", + stac_io=StacApiIO(timeout=timeout, max_retries=3), # FIXME: add 'headers' with authorization/cookies? + collections=collection_id, + modifier=col_field_modifier, + **collection_args + ) + for item in search.items(): + for ctype in supported_media_types: + for _, asset in item.get_assets(media_type=ctype): # type: (..., Asset) + _, file_fmt = get_cwl_file_format(ctype) + file_obj = {"class": "File", "path": asset.href, "format": file_fmt} + resolved_files.append(file_obj) + else: + # allow resolution of the STAC feature itself if no specific asset media-type was matched + # (i.e.: accessing the collection as if it was a plain "OGC API - Features") + if ContentType.APP_GEOJSON in supported_media_types: + # already retrieved the contents, so might as well use them to avoid the extra request + # also, content is necessary to use the updated data in case 'properties' were injected + item_id = get_secure_filename(item.id) + path = os.path.join(output_dir, f"{item_id}.geojson") + with open(path, mode="w", encoding="utf-8") as file: + json.dump(item.to_dict(), file) + _, file_fmt = get_cwl_file_format(ContentType.APP_GEOJSON) + file_obj = {"class": "File", "path": f"file://{path}", "format": file_fmt} + resolved_files.append(file_obj) + + return resolved_files + + +def resolve_static_geojson_features_files( + collection_href, # type: str + collection_args, # type: Dict[str, JSON] + output_dir, # type: str +): # type: (...) -> List[CWL_IO_FileValue] + col_resp = request_extra( + "GET", + collection_href, + queries=collection_args, + headers={"Accept": f"{ContentType.APP_GEOJSON},{ContentType.APP_JSON}"}, + timeout=collection_args["timeout"], + retries=3, + only_server_errors=False, + ) + col_json = col_resp.json() + if not (col_resp.status_code == 200 and "features" in col_json): + raise ValueError(f"Could not parse [{collection_href}] as a GeoJSON FeatureCollection.") + + col_json = process_field_modifiers(col_json, **collection_args) + + resolved_files = [] + for i, feat in enumerate(col_json["features"]): + path = os.path.join(output_dir, f"feature-{i}.geojson") + with open(path, mode="w", encoding="utf-8") as file: + json.dump(feat, file) + _, file_fmt = get_cwl_file_format(ContentType.APP_GEOJSON) + file_obj = {"class": "File", "path": f"file://{path}", "format": file_fmt} + resolved_files.append(file_obj) + return resolved_files + + +def resolve_ogc_api_features_files( + api_url, # type: str + collection_id, # type: str + collection_args, # type: Dict[str, JSON] + supported_media_types, # type: List[str] + output_dir, # type: str +): # type: (...) -> List[CWL_IO_FileValue] + if str(collection_args.get("filter-lang")) == "cql2-json": + collection_args["cql"] = collection_args.pop("filter") + search = Features( + url=api_url, + # FIXME: add 'auth' or 'headers' authorization/cookies? + headers={"Accept": f"{ContentType.APP_GEOJSON}, {ContentType.APP_VDN_GEOJSON}, {ContentType.APP_JSON}"}, + ) + items = search.collection_items(collection_id, **collection_args) + if items.get("type") != "FeatureCollection" or "features" not in items: + raise ValueError( + f"Collection [{api_url}/collections/{collection_id}] using " + f"format [{ExecuteCollectionFormat.OGC_FEATURES}] did not return a GeoJSON FeatureCollection." + ) + + resolved_files = [] + for i, feat in enumerate(items["features"]): + # NOTE: + # since STAC is technically OGC API - Features compliant, both can be used interchangeably + # if media-types are non-GeoJSON and happen to contain STAC Assets, handle it as STAC transparently + if "assets" in feat and supported_media_types != [ContentType.APP_GEOJSON]: + for _, asset in feat["assets"].items(): # type: (str, JSON) + if asset["type"] in supported_media_types: + _, file_fmt = get_cwl_file_format(asset["type"]) + file_obj = {"class": "File", "path": asset["href"], "format": file_fmt} + resolved_files.append(file_obj) + else: + path = os.path.join(output_dir, f"feature-{i}.geojson") + with open(path, mode="w", encoding="utf-8") as file: + json.dump(feat, file) + _, file_fmt = get_cwl_file_format(ContentType.APP_GEOJSON) + file_obj = {"class": "File", "path": f"file://{path}", "format": file_fmt} + resolved_files.append(file_obj) + return resolved_files + + +def resolve_ogc_api_coverages_files( + api_url, # type: str + collection_id, # type: str + collection_args, # type: Dict[str, JSON] + supported_media_types, # type: List[str] + output_dir, # type: str +): # type: (...) -> List[CWL_IO_FileValue] + cov = Coverages( + url=api_url, + # FIXME: add 'auth' or 'headers' authorization/cookies? + headers={"Accept": ContentType.APP_JSON}, + ) + ctype = (supported_media_types or [ContentType.IMAGE_COG])[0] + ext = get_extension(ctype, dot=False) + path = os.path.join(output_dir, f"map.{ext}") + with open(path, mode="wb") as file: + data = cast(io.BytesIO, cov.coverage(collection_id)).getbuffer() + file.write(data) # type: ignore + _, file_fmt = get_cwl_file_format(ctype) + file_obj = {"class": "File", "path": f"file://{path}", "format": file_fmt} + return [file_obj] + + +def resolve_ogc_api_maps_files( + api_url, # type: str + collection_id, # type: str + collection_args, # type: Dict[str, JSON] + supported_media_types, # type: List[str] + output_dir, # type: str +): # type: (...) -> List[CWL_IO_FileValue] + maps = Maps( + url=api_url, + # FIXME: add 'auth' or 'headers' authorization/cookies? + headers={"Accept": ContentType.APP_JSON}, + ) + ctype = (supported_media_types or [ContentType.IMAGE_COG])[0] + ext = get_extension(ctype[0], dot=False) + path = os.path.join(output_dir, f"map.{ext}") + with open(path, mode="wb") as file: + data = cast(io.BytesIO, maps.map(collection_id)).getbuffer() + file.write(data) # type: ignore + _, file_fmt = get_cwl_file_format(ctype) + file_obj = {"class": "File", "path": f"file://{path}", "format": file_fmt} + return [file_obj] + + def process_cwl(collection_input, input_definition, output_dir): # type: (JobValueCollection, ProcessInputOutputItem, Path) -> CWL_IO_ValueMap files = process_collection(collection_input, input_definition, output_dir) diff --git a/weaver/processes/builtin/field_modifier_processor.cwl b/weaver/processes/builtin/field_modifier_processor.cwl new file mode 100644 index 000000000..c842dd100 --- /dev/null +++ b/weaver/processes/builtin/field_modifier_processor.cwl @@ -0,0 +1,54 @@ +#! /usr/bin/env cwl-runner +cwlVersion: v1.0 +class: CommandLineTool +id: field_modifier_processor +label: Field Modifier Processor +doc: | + Performs field modification over properties and contents using the specified input definitions. +# target the installed python pointing to weaver conda env to allow imports +baseCommand: ${WEAVER_ROOT_DIR}/bin/python +arguments: ["${WEAVER_ROOT_DIR}/weaver/processes/builtin/properties_processor.py", "-o", $(runtime.outdir)] +inputs: + properties: + doc: Properties definition submitted to the process and to be generated from input values. + type: File + format: "iana:application/json" + inputBinding: + prefix: --properties + filter: + doc: Filter expression submitted to the process and to be generated from input values. + type: File + format: "iana:application/json" + inputBinding: + prefix: --filter + filter-crs: + doc: Filter Coordinate Reference System (CRS) to employ with the 'filter' parameter. + type: string + default: "EPSG:4326" + inputBinding: + prefix: --filter-crs + filter-lang: + doc: Filter language to interpret the 'filter' parameter. + type: string? + inputBinding: + prefix: --filter-lang + sortBy: + doc: Sorting definition with relevant field names and ordering direction. + type: string? + inputBinding: + prefix: --sortby + values: + doc: Values available for content field modification. + type: File + format: "iana:application/json" + inputBinding: + prefix: -V +outputs: + referenceOutput: + doc: Generated file contents from specified field modifiers. + type: File + # note: important to omit 'format' here, since we want to preserve the flexibility to retrieve 'any' reference + outputBinding: + outputEval: $(runtime.outdir)/* +$namespaces: + iana: "https://www.iana.org/assignments/media-types/" diff --git a/weaver/processes/builtin/field_modifier_processor.py b/weaver/processes/builtin/field_modifier_processor.py new file mode 100644 index 000000000..448ec4ead --- /dev/null +++ b/weaver/processes/builtin/field_modifier_processor.py @@ -0,0 +1,550 @@ +#!/usr/bin/env python +""" +Generates properties contents using the specified input definitions. +""" +import abc +import argparse +import functools +import json +import logging +import os +import sys +import uuid +from typing import TYPE_CHECKING + +from pyparsing import ( + Combine, + Literal, + OpAssoc, + ParserElement, + Word, + ZeroOrMore, + alphanums, + alphas, + infix_notation, + nums, + one_of +) + +CUR_DIR = os.path.abspath(os.path.dirname(__file__)) +sys.path.insert(0, CUR_DIR) +# root to allow 'from weaver import <...>' +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(CUR_DIR)))) + +# place weaver specific imports after sys path fixing to ensure they are found from external call +# pylint: disable=C0413,wrong-import-order +from weaver.formats import ContentType, get_cwl_file_format # isort:skip # noqa: E402 +from weaver.processes.builtin.utils import get_package_details # isort:skip # noqa: E402) +from weaver.utils import Lazify, fully_qualified_name, load_file, repr_json # isort:skip # noqa: E402 + +if TYPE_CHECKING: + from typing import Callable, Dict, List, Optional, Tuple, TypeAlias, TypeVar, Union + + from weaver.typedefs import ( + CWL_IO_ValueMap, + FieldModifierFilterCRS, + FieldModifierFilterExpression, + FieldModifierFilterLang, + FieldModifierProperties, + FieldModifierSortBy, + JSON, + Number, + Path + ) + from weaver.utils import LoggerHandler + + PropertyValue = TypeVar("PropertyValue") + PropertyVariable = Union[PropertyValue, Number] + PropertyGetter: TypeAlias = Callable[[str], PropertyVariable] + +PACKAGE_NAME, PACKAGE_BASE, PACKAGE_MODULE = get_package_details(__file__) + +# setup logger since it is not run from the main 'weaver' app +LOGGER = logging.getLogger(PACKAGE_MODULE) +LOGGER.addHandler(logging.StreamHandler(sys.stdout)) +LOGGER.setLevel(logging.INFO) + +# process details +__version__ = "1.0" +__title__ = "Field Modifier Processor" +__abstract__ = __doc__ # NOTE: '__doc__' is fetched directly, this is mostly to be informative + +OUTPUT_CWL_JSON = "cwl.output.json" + + +class PropertyHandler(abc.ABC): + def __init__(self, instance): + self.instance = instance + + @abc.abstractmethod + def __getattr__(self, property_name): + # type: (str) -> PropertyValue + raise NotImplementedError + + @abc.abstractmethod + def __setattr__(self, property_name, value): + # type: (str, PropertyValue) -> None + raise NotImplementedError + + @abc.abstractmethod + def __delattr__(self, property_name): + # type: (str) -> None + raise NotImplementedError + + @abc.abstractmethod + def __iter__(self): + # type: () -> List[PropertyValue] + raise NotImplementedError + + +class DictHandler(dict, PropertyHandler): + """ + Operates on dictionary properties directly by key names. + """ + def __getattr__(self, property_name): + return dict.__getitem__(self, property_name) + + +class NestedDictHandler(dict, PropertyHandler): + """ + Operates on dictionary properties with nested key names separated by dots (``.``). + + .. code-block:: python + + handler = NestedDictHandler({"A": {"B": 123}}) + handler.__getattr__("A.B") # returns 123 + handler.__setattr__("A.B", 456) # updates {"A": {"B": 456}} + """ + def _get_parent_nested(self, property_name): + # type: (str) -> Tuple[Optional[Union[Dict[str, JSON]]], Optional[Union[str, int]]] + props = property_name.split(".") + data = self + for idx, prop in enumerate(props): + if isinstance(data, dict): + data = data.get(prop) + elif isinstance(data, list) and str.isnumeric(prop): + data = data[int(prop)] + else: + raise ValueError(f"Invalid property accessor unresolved: {property_name}") + if idx >= len(props) - 2: + if data is None: + break + last = props[-1] + last = int(last) if str.isnumeric(last) else last + return data, last + return None, None + + def __getattr__(self, property_name): + data, last = self._get_parent_nested(property_name) + if last is not None: + return data[last] + + def __setattr__(self, property_name, value): + data, last = self._get_parent_nested(property_name) + if last is not None: + data[last] = value + + def __delattr__(self, property_name): + data, last = self._get_parent_nested(property_name) + if last is not None: + del data[last] + + +@functools.lru_cache(1024) +def create_expression_parser(): # pylint: disable=R1260,too-complex + # type: () -> ParserElement + """ + Creates a parser that can safely evaluate the underlying arithmetic expression with variable substitutions. + + .. seealso:: + A mixture of the following examples serve as reference for the expression implementation: + + - https://github.com/pyparsing/pyparsing/blob/master/examples/simpleArith.py + - https://github.com/pyparsing/pyparsing/blob/master/examples/eval_arith.py + - https://github.com/pyparsing/pyparsing/blob/master/examples/excel_expr.py + """ + + ParserElement.enablePackrat() + + class EvalConstant: + """ + Class to evaluate a parsed constant or variable. + """ + def __init__(self, tokens): + self.value = tokens[0] + + def eval(self, variables, getter): + # type: (Dict[str, Union[PropertyValue, Number]], PropertyGetter) -> Union[PropertyValue, Number] + if self.value in variables: + return variables[self.value] + elif str(self.value).isnumeric(): + return float(self.value) + return getter(self.value) + + class EvalSignOp: + """ + Class to evaluate expressions with a leading ``+`` or ``-`` sign. + """ + + def __init__(self, tokens): + self.sign, self.value = tokens[0] + + def eval(self, variables, getter): + # type: (Dict[str, PropertyVariable], PropertyGetter) -> PropertyVariable + mult = {"+": 1, "-": -1}[self.sign] + return mult * self.value.eval(variables, getter) + + def operator_operands(tokens): + """ + Generator to extract operators and operands in pairs. + """ + it = iter(tokens) + while 1: + try: + yield next(it), next(it) + except StopIteration: + break + + class EvalPowerOp: + """ + Class to evaluate power expressions. + """ + + def __init__(self, tokens): + self.value = tokens[0] + + def eval(self, variables, getter): + # type: (Dict[str, PropertyVariable], PropertyGetter) -> PropertyVariable + res = self.value[-1].eval(variables, getter) + for val in self.value[-3::-2]: + res = val.eval(variables, getter) ** res + return res + + class EvalMultOp: + """ + Class to evaluate multiplication and division expressions. + """ + + def __init__(self, tokens): + self.value = tokens[0] + + def eval(self, variables, getter): + # type: (Dict[str, PropertyVariable], PropertyGetter) -> PropertyVariable + prod = self.value[0].eval(variables, getter) + for op, val in operator_operands(self.value[1:]): + if op == "*": + prod *= val.eval(variables, getter) + if op == "/": + prod /= val.eval(variables, getter) + return prod + + class EvalAddOp: + """ + Class to evaluate addition and subtraction expressions. + """ + + def __init__(self, tokens): + self.value = tokens[0] + + def eval(self, variables, getter): + # type: (Dict[str, PropertyVariable], PropertyGetter) -> PropertyVariable + _sum = self.value[0].eval(variables, getter) + for op, val in operator_operands(self.value[1:]): + if op == "+": + _sum += val.eval(variables, getter) + if op == "-": + _sum -= val.eval(variables, getter) + return _sum + + class EvalComparisonOp: + """ + Class to evaluate comparison expressions. + """ + opMap = { + "<": lambda a, b: a < b, + "<=": lambda a, b: a <= b, + ">": lambda a, b: a > b, + ">=": lambda a, b: a >= b, + "!=": lambda a, b: a != b, + "=": lambda a, b: a == b, + "LT": lambda a, b: a < b, + "LE": lambda a, b: a <= b, + "GT": lambda a, b: a > b, + "GE": lambda a, b: a >= b, + "NE": lambda a, b: a != b, + "EQ": lambda a, b: a == b, + "<>": lambda a, b: a != b, + } + + def __init__(self, tokens): + self.value = tokens[0] + + def eval(self, variables, getter): + # type: (Dict[str, PropertyVariable], PropertyGetter) -> PropertyVariable + val1 = self.value[0].eval(variables, getter) + for op, val in operator_operands(self.value[1:]): + fn = EvalComparisonOp.opMap[op] + val2 = val.eval(variables, getter) + if not fn(val1, val2): + break + val1 = val2 + else: + return True + return False + + # define the parser + integer = Word(nums) + real = Combine(Word(nums) + "." + Word(nums)) + ident = Word(alphas, alphanums + "_", min=1) + variable = Combine(ident + ZeroOrMore((Literal(":") | Literal(".")) + ident)) + operand = real | integer | variable + + sign_op = one_of("+ -") + mult_op = one_of("* /") + plus_op = one_of("+ -") + exp_op = one_of("** ^") + + # use parse actions to attach EvalXXX constructors to sub-expressions + operand.setParseAction(EvalConstant) + arith_expr = infix_notation( + operand, + [ + (sign_op, 1, OpAssoc.RIGHT, EvalSignOp), + (exp_op, 2, OpAssoc.LEFT, EvalPowerOp), + (mult_op, 2, OpAssoc.LEFT, EvalMultOp), + (plus_op, 2, OpAssoc.LEFT, EvalAddOp), + ], + ) + + comparison_op = one_of("< <= > >= != = <> LT GT LE GE EQ NE") + comp_expr = infix_notation( + arith_expr, + [ + (comparison_op, 2, OpAssoc.LEFT, EvalComparisonOp), + ], + ) + return comp_expr + + +def evaluate_property( + properties, # type: Dict[str, JSON] + property_expression, # type: str + property_handler, # type: PropertyHandler +): # type: (...) -> PropertyVariable + """ + Evaluates the applicable property expression with variable retrieval from reference data using the specifed handler. + + :param properties: Mapping of available property variables and expressions. + :param property_expression: Calculation to be evaluated for the property, possibly referring to other properties. + :param property_handler: Implementation that knows how to manipulate property access from the data. + """ + parser = create_expression_parser() + expr = parser.parse_string(property_expression)[0] + result = expr.eval(properties, getter=property_handler.__getattr__) + return result + + +def process_field_modifiers( + values, # type: PropertyValue + *, # force named keyword arguments after + filter_expr=None, # type: FieldModifierFilterExpression, + filter_crs=None, # type: FieldModifierFilterCRS, + filter_lang=None, # type: FieldModifierFilterLang, + properties=None, # type: FieldModifierProperties, + property_handler=None, # type: PropertyHandler + sortby=None, # type: FieldModifierSortBy, + logger=LOGGER, # type: LoggerHandler +): # type: (...) -> Dict[str, PropertyValue] + """ + Processor of field modifiers for an input or output. + + .. note:: + Modifications are applied inline to the specified :paramref:`values` to allow integration + with various interfaces that have such expectations. For convenience, modified contents are + also returned for interfaces that expect the result as output. + + :param values: Values available for properties modification. + :param filter_expr: Filter expression submitted to the process and to be generated from input values. + :param filter_lang: Filter language to interpret the filter expression. + :param filter_crs: Filter Coordinate Reference System (CRS) to employ with the filter expression. + :param properties: Properties definition submitted to the process and to be generated from input values. + :param property_handler: Implementation that knows how to manipulate property access for the data destination. + :param sortby: Sorting definition with relevant field names and ordering direction. + :param logger: Optional logger handler to employ. + :return: File reference containing the resolved properties. + """ + logger.log( # pylint: disable=E1205 # false positive + logging.INFO, + ( + "Process [%s] got arguments:\n" + " filter_expr=%s\n" + " filter_crs=%s\n" + " filter_lang=%s\n" + " properties=%s\n" + " property_handler=%s\n" + " sortby=%s\n" + " values=%s" + ), + PACKAGE_NAME, + Lazify(lambda: repr_json(filter_expr, indent=2)), + Lazify(lambda: repr_json(filter_crs, indent=2)), + Lazify(lambda: repr_json(filter_lang, indent=2)), + Lazify(lambda: repr_json(properties, indent=2)), + Lazify(lambda: fully_qualified_name(property_handler) if property_handler else None), + Lazify(lambda: repr_json(sortby, indent=2)), + Lazify(lambda: repr_json(values, indent=2)), + ) + properties = properties or {} + + # if unspecified, consider the values as dict with handler + # (values will be set/retrieved directly by literal keys) + # alternate value handler need to be specified explicitly + if not property_handler: + property_handler = values + + # FIXME: handle filtering + # define a meta-set operation wrapping the property handler ? + # eg: for each 'feature', do 'properties', but 'filter'/'sortby' over entire set of 'features' + + # properties are a filtering subset + if isinstance(properties, list): + available_properties = set(property_handler) + requested_properties = set(properties) + for prop in available_properties - requested_properties: + del property_handler[prop] + + # otherwise, properties are expressions + else: + # sort properties later if they depend on other ones, the least dependencies to be computed first + props_deps = {prop: 0 for prop in properties} + for prop, calc in properties.items(): + for prop_dep in props_deps: + if prop == prop_dep: + if prop in str(calc): + raise ValueError(f"Invalid recursive property [{prop}] references itself.") + continue + if prop_dep in str(calc): + props_deps[prop_dep] += 1 + if not filter(lambda dep: dep[-1] == 0, props_deps.items()): + raise ValueError("Invalid properties all depend on another one. Impossible resolution order.") + props = sorted( + list(properties.items()), + key=lambda p: props_deps[p[0]], + reverse=True, + ) + + # compute the properties + for prop, calc in props: + result = evaluate_property( + properties, + str(calc), + property_handler=property_handler, + ) + # make resolved values available for next iterations + # the 'properties' become a temporary variable buffer for cross-reference + # the 'property_handler' decides if that result should be inserted or not + properties[prop] = result + property_handler.__setattr__(prop, result) + + # FIXME: handle sorting - over set of objects containing 'properties' + + return properties + + +def process_cwl( + input_filter_expr, # type: FieldModifierFilterExpression, + input_filter_crs, # type: FieldModifierFilterCRS, + input_filter_lang, # type: FieldModifierFilterLang, + input_properties, # type: FieldModifierProperties, + input_sortby, # type: FieldModifierSortBy, + input_values, # type: Dict[str, JSON] + output_dir, # type: Path +): # type: (...) -> CWL_IO_ValueMap + result = process_field_modifiers( + values=input_values, + filter_expr=input_filter_expr, + filter_crs=input_filter_crs, + filter_lang=input_filter_lang, + properties=input_properties, + sortby=input_sortby, + ) + file_path = os.path.join(output_dir, f"{uuid.uuid4()}.json") + with open(file_path, mode="w", encoding="utf-8") as mod_file: + json.dump(result, mod_file, indent=2) + out_cwl_file = { + "class": "File", + "path": file_path, + "format": get_cwl_file_format(ContentType.APP_JSON, make_reference=True), + } + cwl_outputs = {"referenceOutput": out_cwl_file} # output ID must match the one used in CWL definition + cwl_out_path = os.path.join(output_dir, OUTPUT_CWL_JSON) + with open(cwl_out_path, mode="w", encoding="utf-8") as cwl_out_file: + json.dump(cwl_outputs, cwl_out_file) + return cwl_outputs + + +def main(*args): + # type: (*str) -> None + LOGGER.info("Process [%s] Parsing inputs...", PACKAGE_NAME) + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument( + "-F", "--filter", + dest="input_filter_expr", + metavar="INPUT_FILTER_EXPRESSION", + required=False, + help="Filter definition submitted to the process and to be generated from input values.", + ) + parser.add_argument( + "--filter-crs", + dest="input_filter_crs", + metavar="INPUT_FILTER_CRS", + required=False, + help="Filter Coordinate Reference System (CRS) to employ with the 'filter' parameter.", + ) + parser.add_argument( + "--filter-lang", + dest="input_filter_lang", + metavar="INPUT_FILTER_LANGUAGE", + required=False, + help="Filter language to interpret the 'filter' parameter.", + ) + parser.add_argument( + "-P", "--properties", + dest="input_properties", + metavar="INPUT_PROPERTIES", + required=False, + help="Properties definition submitted to the process and to be generated from input values.", + ) + parser.add_argument( + "-S", "--sortby", "--sortBy", "--sort-by", + dest="input_sortby", + metavar="INPUT_SORTBY", + required=False, + help="Sorting definition with relevant field names and ordering direction.", + ) + parser.add_argument( + "-V", "--values", + dest="input_values", + metavar="INPUT_VALUES", + required=True, + help="Values available for properties generation.", + ) + parser.add_argument( + "-o", "--outdir", + dest="output_dir", + metavar="OUTDIR", + required=True, + help="Output directory of the retrieved data.", + ) + ns = parser.parse_args(*args) + LOGGER.info("Process [%s] Loading properties input from file '%s'.", PACKAGE_NAME, ns.properties) + prop_in = load_file(ns.input_properties) + LOGGER.info("Process [%s] Loading values input from file '%s'.", PACKAGE_NAME, ns.values) + val_in = load_file(ns.input_values) + params = {**vars(ns)} + params.update({"input_properties": prop_in, "input_values": val_in}) + sys.exit(process_cwl(**params) is not None) + + +if __name__ == "__main__": + main() diff --git a/weaver/processes/execution.py b/weaver/processes/execution.py index 8b6e58fa9..1f706941c 100644 --- a/weaver/processes/execution.py +++ b/weaver/processes/execution.py @@ -39,6 +39,7 @@ from weaver.owsexceptions import OWSInvalidParameterValue, OWSNoApplicableCode from weaver.processes import wps_package from weaver.processes.builtin.collection_processor import process_collection +from weaver.processes.builtin.field_modifier_processor import process_field_modifiers from weaver.processes.constants import WPS_BOUNDINGBOX_DATA, WPS_COMPLEX_DATA, JobInputsOutputsSchema from weaver.processes.convert import ( convert_input_values_schema, @@ -672,7 +673,24 @@ def parse_wps_inputs(wps_process, job, container=None): f"Abort execution. Cannot map multiple outputs {list(out_ids)} " f"from [{proc_uri}] to input [{input_id}] of [{job.process}]." ) - resolved_input_values = [(results[0], input_info)] + results = results[0] + + field_modifier_properties = input_value.get("properties") + field_modifier_filter_expr = input_value.get("filter") + field_modifier_filter_crs = input_value.get("filter-crs") + field_modifier_filter_lang = input_value.get("filter-lang") + field_modifier_sortby = input_value.get("sortBy") + if field_modifier_properties or field_modifier_filter_expr or field_modifier_sortby: + results = process_field_modifiers( + results, + properties=field_modifier_properties, + filter_expr=field_modifier_filter_expr, + filter_crs=field_modifier_filter_crs, + filter_lang=field_modifier_filter_lang, + sortby=field_modifier_sortby, + ) + + resolved_input_values = [(results, input_info)] # typical file/data else: diff --git a/weaver/typedefs.py b/weaver/typedefs.py index a41c27a40..4389b4e7d 100644 --- a/weaver/typedefs.py +++ b/weaver/typedefs.py @@ -430,6 +430,32 @@ class CWL_SchemaName(Protocol): DataSource = Union[DataSourceFileRef, DataSourceOpenSearch] DataSourceConfig = Dict[str, DataSource] # JSON/YAML file contents + FieldModifierFilterExpression = Union[ + str, + JSON, # CQL or other + ] + FieldModifierFilterCRS = str + FieldModifierFilterLang = str + FieldModifierPropertiesFilter = List[str] + FieldModifierPropertiesExpr = Dict[str, str] + FieldModifierProperties = Union[FieldModifierPropertiesExpr, FieldModifierPropertiesFilter] + FieldModifierSortByItem = TypedDict("FieldModifierSortByItem", { + "field": str, + "direction": Literal["asc", "desc"], + }, total=True) + FieldModifierSortBy = Union[ + str, + List[str], + List[FieldModifierSortByItem], + ] + FiledModifiers = TypedDict("FiledModifiers", { + "filter": Optional[FieldModifierFilterExpression], + "filter-crs": Optional[FieldModifierFilterCRS], + "filter-lang": Optional[FieldModifierFilterLang], + "properties": Optional[FieldModifierProperties], + "sortBy": Optional[FieldModifierSortBy], + }, total=False) + JobValueBbox = TypedDict("JobValueBbox", { "bbox": Required[List[Number]], "crs": NotRequired[str], @@ -452,12 +478,17 @@ class CWL_SchemaName(Protocol): }, total=False) JobValueCollection = TypedDict("JobValueCollection", { "collection": Required[str], - "filter": Optional[JSON], - "filter-crs": Optional[str], - "filter-lang": Optional[str], - "sortBy": Optional[str], # FIXME: JSON? (https://github.com/opengeospatial/ogcapi-processes/issues/429) + "filter": Optional[FieldModifierFilterExpression], + "filter-crs": Optional[FieldModifierFilterCRS], + "filter-lang": Optional[FieldModifierFilterLang], + "properties": Optional[FieldModifierProperties], + "sortBy": Optional[FieldModifierSortBy], }, total=False) - JobValueNestedProcess = "ProcessExecution" # type: TypeAlias + JobValueNestedProcessOnly = "ProcessExecution" # type: TypeAlias + JobValueNestedProcess = Union[ + JobValueNestedProcessOnly, + FiledModifiers, + ] JobValueData = TypedDict("JobValueData", { "data": Required[AnyValueType], }, total=False) diff --git a/weaver/wps_restapi/colander_extras.py b/weaver/wps_restapi/colander_extras.py index 524a1032a..21cc4377c 100644 --- a/weaver/wps_restapi/colander_extras.py +++ b/weaver/wps_restapi/colander_extras.py @@ -430,7 +430,7 @@ def __init__(self, schemes=None, path_pattern=None, msg=None, flags=re.IGNORECAS if path_pattern: if isinstance(path_pattern, RegexPattern): path_pattern = path_pattern.pattern - # depending colander version: $ end-of-line, \Z end-of-string (before \n if any), or \z end-of-string (\0) + # depends on colander version: $ end-of-line, \Z end-of-string (before \n if any), or \z end-of-string (\0) index = -2 if regex.lower().endswith(r"\z") else -1 if regex.endswith("$") else 0 regex = rf"{regex[:index] + path_pattern}\Z" super(SchemeURL, self).__init__(regex, msg=msg, flags=flags) @@ -1400,7 +1400,7 @@ def _order_deserialize(cstruct, sort_first=None, sort_after=None): return result -class SchemaRefMappingSchema(ExtendedNodeInterface, ExtendedSchemaBase): +class SchemaRefMappingSchema(ExtendedNodeInterface, ExtendedSchemaBase, TypeConverter): """ Mapping schema that supports auto-insertion of JSON-schema references provided in the definition. @@ -1532,23 +1532,24 @@ def _deserialize_impl(self, cstruct): # pylint: disable=W0222,signature-differs return self._schema_deserialize(cstruct, schema_id, None) return cstruct - def convert_type(self, cstruct, dispatcher=None): # noqa # parameter to allow forwarding ref for override schemas - # type: (OpenAPISchema, Optional[TypeConversionDispatcher]) -> OpenAPISchema + @staticmethod + def convert_type(node, cstruct, dispatcher=None): # noqa # parameter to allow forwarding ref for override schemas + # type: (colander.SchemaNode, OpenAPISchema, Optional[TypeConversionDispatcher]) -> OpenAPISchema """ Converts the node to obtain the :term:`JSON` schema definition. """ schema_id = schema_meta = None - schema_id_include = getattr(self, "_schema_include", False) - schema_id_include_convert_type = getattr(self, "_schema_include_convert_type", False) - schema_meta_include = getattr(self, "_schema_meta_include", False) - schema_meta_include_convert_type = getattr(self, "_schema_meta_include_convert_type", False) - schema_extra = getattr(self, "_schema_extra", None) + schema_id_include = getattr(node, "_schema_include", False) + schema_id_include_convert_type = getattr(node, "_schema_include_convert_type", False) + schema_meta_include = getattr(node, "_schema_meta_include", False) + schema_meta_include_convert_type = getattr(node, "_schema_meta_include_convert_type", False) + schema_extra = getattr(node, "_schema_extra", None) if schema_id_include and schema_id_include_convert_type: - schema_id = getattr(self, "_schema", None) + schema_id = getattr(node, "_schema", None) if schema_meta_include and schema_meta_include_convert_type: - schema_meta = getattr(self, "_schema_meta", None) + schema_meta = getattr(node, "_schema_meta", None) if schema_id or schema_meta or schema_extra: - return self._schema_deserialize(cstruct, schema_meta, schema_id, schema_extra) + return node._schema_deserialize(cstruct, schema_meta, schema_id, schema_extra) return cstruct @staticmethod diff --git a/weaver/wps_restapi/swagger_definitions.py b/weaver/wps_restapi/swagger_definitions.py index 21475c1c2..7598f9f19 100644 --- a/weaver/wps_restapi/swagger_definitions.py +++ b/weaver/wps_restapi/swagger_definitions.py @@ -1591,7 +1591,7 @@ def convert(self, filter_expr, filter_lang): def deserialize(self, cstruct): # type: (JSON) -> Union[JSON, colander.null] result = super().deserialize(cstruct) - if not result: + if not cstruct: return result filter_expr = result.get("filter") filter_lang = result.get("filter-lang") @@ -1620,6 +1620,55 @@ def deserialize(self, cstruct): return result +class AnyPropertyExpression(AnyOfKeywordSchema): + _any_of = [ + PermissiveMappingSchema(validator=Length(min=1)), + ExtendedSchemaNode(Float()), + ExtendedSchemaNode(Integer()), + ExtendedSchemaNode(String(), validator=Length(min=1)), + ] + + +class PropertyFiltering(ExtendedSequenceSchema): + description = "Properties to filter from available data." + prop = ExtendedSchemaNode(String(), validator=Length(min=1)) + validator = Length(min=1) + + +class PropertiesExpression(ExtendedMappingSchema): + description = "Properties to compute from available data." + prop = AnyPropertyExpression( + variable="{property}", + description="Expression that defines how to compute the property.", + ) + validator = Length(min=1) + + +class PropertiesDefinition(OneOfKeywordSchema): + summary = "Properties to retrieve or compute." + description = "Properties produced from the data in the context of where they are specified." + _one_of = [ + PropertyFiltering(), + PropertiesExpression(), + ] + + +class PropertiesSchema(ExtendedMappingSchema): + properties = PropertiesDefinition(missing=drop) + + def deserialize(self, cstruct): + result = super().deserialize(cstruct) + if "properties" in cstruct: + props = (result or {}).get("properties") + if cstruct["properties"] != props: + raise colander.Invalid( + node=self, + msg="Invalid properties definition could not be interpreted.", + value={"properties": repr_json(cstruct["properties"])}, + ) + return result + + class SortByExpression(ExpandStringList, ExtendedSchemaNode): schema_type = String default = None @@ -1631,9 +1680,38 @@ class SortByExpression(ExpandStringList, ExtendedSchemaNode): ) +class SortByDirection(ExtendedSchemaNode): + schema_type = String + validator = OneOf(["asc", "desc"]) + + +class SortByObject(StrictMappingSchema): + field = ExtendedSchemaNode(String()) + direction = SortByDirection() + + +class SortByItem(OneOfKeywordSchema): + _one_of = [ + SortByExpression(), + SortByObject(), + ] + + +class SortByList(ExtendedSequenceSchema): + item = SortByItem() + validator = Length(min=1) + + +class SortByDefinition(OneOfKeywordSchema): + _one_of = [ + SortByExpression(), + SortByList(), + ] + + class SortBySchema(ExtendedMappingSchema): - sort_by_lower = SortByExpression(name="sortby", missing=drop) - sort_by_upper = SortByExpression(name="sortBy", missing=drop) + sort_by_lower = SortByDefinition(name="sortby", missing=drop) + sort_by_upper = SortByDefinition(name="sortBy", missing=drop) def deserialize(self, cstruct): # type: (JSON) -> Union[JSON, colander.null] @@ -1644,17 +1722,30 @@ def deserialize(self, cstruct): Therefore, additional fields must be left untouched. """ result = super().deserialize(cstruct) - if not result: + if not cstruct: return result if result.get("sortby"): # keep only 'official' "sortBy" from OGC API Processes - # others OGC APIs use "sortby", but their query parameters are usually case-insensitive + # others APIs (Features, Records, STAC) use "sortby", but their query parameter is usually case-insensitive if not result.get("sortBy"): result["sortBy"] = result["sortby"] del result["sortby"] + sort_by = cstruct.get("sortby") or cstruct.get("sortBy") + if "sortBy" not in result and sort_by: + raise colander.Invalid( + node=self, + msg="Invalid sortBy expression could not be interpreted.", + value={"sortBy": repr_json(sort_by)}, + ) return result +class FieldModifierSchema(FilterSchema, SortBySchema, PropertiesSchema): + """ + Field modifiers that can operation on properties identified within the referenced content definition. + """ + + class SupportedCRS(ExtendedMappingSchema): crs = AnyCRS(title="CRS", description="Coordinate Reference System") default = ExtendedSchemaNode(Boolean(), missing=drop) @@ -3913,7 +4004,7 @@ class ExecuteCollectionFormatEnum(ExtendedSchemaNode): validator = OneOf(ExecuteCollectionFormat.values()) -class ExecuteCollectionInput(FilterSchema, SortBySchema, PermissiveMappingSchema): +class ExecuteCollectionInput(FieldModifierSchema, PermissiveMappingSchema): description = inspect.cleandoc(""" Reference to a 'collection' that can optionally be filtered, sorted, or parametrized. @@ -4315,7 +4406,7 @@ class ExecuteInputOutputs(ExtendedMappingSchema): ) -class ExecuteParameters(ExecuteInputOutputs): +class ExecuteParameters(ExtendedMappingSchema): """ Basic execution parameters that can be submitted to run a process. @@ -4341,17 +4432,21 @@ class ExecuteParameters(ExecuteInputOutputs): subscribers = JobExecuteSubscribers(missing=drop) -class ExecuteProcessParameters(ExecuteParameters): +class ExecuteProcessParameters(ExecuteParameters, ExecuteInputOutputs, FieldModifierSchema): title = "ExecuteProcessParameters" _schema = f"{OGC_API_PROC_PART1_SCHEMAS}/execute.yaml" _sort_first = [ "title", "process", + "mode", + "response", "inputs", "outputs", + "filter", + "filter-crs", + "filter-lang", "properties", - "mode", - "response", + "sortBy", "subscribers", ] _title = JobTitle(name="title", missing=drop) @@ -4396,11 +4491,15 @@ class Execute(AllOfKeywordSchema): "title", "status", "process", + "mode", + "response", "inputs", "outputs", + "filter", + "filter-crs", + "filter-lang", "properties", - "mode", - "response", + "sortBy", "subscribers", ] _all_of = [