diff --git a/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py b/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py index 44f414343..8b1f45fcd 100644 --- a/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py +++ b/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py @@ -120,14 +120,6 @@ def propagate_types_and_parameters( if found_type: propagated_component["type"] = found_type - # When there is no resolved type, we're not processing a component (likely a regular object) and don't need to propagate parameters - # When the type refers to a json schema, we're not processing a component as well. This check is currently imperfect as there could - # be json_schema are not objects but we believe this is not likely in our case because: - # * records are Mapping so objects hence SchemaLoader root should be an object - # * connection_specification is a Mapping - if "type" not in propagated_component or self._is_json_schema_object(propagated_component): - return propagated_component - # Combines parameters defined at the current level with parameters from parent components. Parameters at the current # level take precedence current_parameters = dict(copy.deepcopy(parent_parameters)) @@ -138,6 +130,27 @@ def propagate_types_and_parameters( else {**current_parameters, **component_parameters} ) + # When there is no resolved type, we're not processing a component (likely a regular object) and don't need to propagate parameters + # When the type refers to a json schema, we're not processing a component as well. This check is currently imperfect as there could + # be json_schema are not objects but we believe this is not likely in our case because: + # * records are Mapping so objects hence SchemaLoader root should be an object + # * connection_specification is a Mapping + if self._is_json_schema_object(propagated_component): + return propagated_component + + # For objects that don't have type check if their object fields have nested components which should have `$parameters` in it. + # For example, QueryProperties in requester.request_parameters, etc. + # Update propagated_component value with nested components with parent `$parameters` if needed and return propagated_component. + if "type" not in propagated_component: + if self._has_nested_components(propagated_component): + propagated_component = self._process_nested_components( + propagated_component, + parent_field_identifier, + current_parameters, + use_parent_parameters, + ) + return propagated_component + # Parameters should be applied to the current component fields with the existing field taking precedence over parameters if # both exist for parameter_key, parameter_value in current_parameters.items(): @@ -181,4 +194,33 @@ def propagate_types_and_parameters( @staticmethod def _is_json_schema_object(propagated_component: Mapping[str, Any]) -> bool: - return propagated_component.get("type") == "object" + return propagated_component.get("type") == "object" or propagated_component.get("type") == [ + "null", + "object", + ] + + @staticmethod + def _has_nested_components(propagated_component: Dict[str, Any]) -> bool: + for k, v in propagated_component.items(): + if isinstance(v, dict) and v.get("type"): + return True + return False + + def _process_nested_components( + self, + propagated_component: Dict[str, Any], + parent_field_identifier: str, + current_parameters: Mapping[str, Any], + use_parent_parameters: Optional[bool] = None, + ) -> Dict[str, Any]: + for field_name, field_value in propagated_component.items(): + if isinstance(field_value, dict) and field_value.get("type"): + nested_component_with_parameters = self.propagate_types_and_parameters( + parent_field_identifier, + field_value, + current_parameters, + use_parent_parameters=use_parent_parameters, + ) + propagated_component[field_name] = nested_component_with_parameters + + return propagated_component diff --git a/unit_tests/sources/declarative/parsers/test_manifest_component_transformer.py b/unit_tests/sources/declarative/parsers/test_manifest_component_transformer.py index 67bf6b397..2a2dedbc1 100644 --- a/unit_tests/sources/declarative/parsers/test_manifest_component_transformer.py +++ b/unit_tests/sources/declarative/parsers/test_manifest_component_transformer.py @@ -460,3 +460,112 @@ def test_do_not_propagate_parameters_on_json_schema_object(): actual_component = transformer.propagate_types_and_parameters("", component, {}) assert actual_component == expected_component + + +def test_propagate_property_chunking(): + component = { + "type": "DeclarativeStream", + "streams": [ + { + "type": "DeclarativeStream", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://test.com", + "request_parameters": { + "properties": { + "type": "QueryProperties", + "property_list": { + "type": "PropertiesFromEndpoint", + "property_field_path": ["name"], + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://test.com", + "authenticator": { + "$ref": "#/definitions/authenticator" + }, + "path": "/properties/{{ parameters.entity }}/properties", + "http_method": "GET", + "request_headers": {"Content-Type": "application/json"}, + }, + }, + }, + "property_chunking": { + "type": "PropertyChunking", + "property_limit_type": "characters", + "property_limit": 15000, + }, + } + }, + }, + }, + "$parameters": {"entity": "test_entity"}, + } + ], + } + expected_component = { + "streams": [ + { + "$parameters": {"entity": "test_entity"}, + "entity": "test_entity", + "retriever": { + "$parameters": {"entity": "test_entity"}, + "entity": "test_entity", + "requester": { + "$parameters": {"entity": "test_entity"}, + "entity": "test_entity", + "request_parameters": { + "properties": { + "$parameters": {"entity": "test_entity"}, + "entity": "test_entity", + "property_chunking": { + "$parameters": {"entity": "test_entity"}, + "entity": "test_entity", + "property_limit": 15000, + "property_limit_type": "characters", + "type": "PropertyChunking", + }, + "property_list": { + "$parameters": {"entity": "test_entity"}, + "entity": "test_entity", + "property_field_path": ["name"], + "retriever": { + "$parameters": {"entity": "test_entity"}, + "entity": "test_entity", + "requester": { + "$parameters": {"entity": "test_entity"}, + "authenticator": { + "$ref": "#/definitions/authenticator" + }, + "entity": "test_entity", + "http_method": "GET", + "path": "/properties/{{ " + "parameters.entity " + "}}/properties", + "request_headers": {"Content-Type": "application/json"}, + "type": "HttpRequester", + "url_base": "https://test.com", + }, + "type": "SimpleRetriever", + }, + "type": "PropertiesFromEndpoint", + }, + "type": "QueryProperties", + } + }, + "type": "HttpRequester", + "url_base": "https://test.com", + }, + "type": "SimpleRetriever", + }, + "type": "DeclarativeStream", + } + ], + "type": "DeclarativeStream", + } + transformer = ManifestComponentTransformer() + actual_component = transformer.propagate_types_and_parameters("", component, {}) + assert actual_component == expected_component