From ed0e8040098bb7565028252151ad8104e101c10b Mon Sep 17 00:00:00 2001 From: darynaishchenko Date: Tue, 13 May 2025 16:37:22 +0300 Subject: [PATCH 1/7] pass top level params to PropertiesFromEndpoint requester --- .../parsers/model_to_component_factory.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 7ceb19f14..a647812d1 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -2758,6 +2758,9 @@ def create_parent_stream_config( def create_properties_from_endpoint( self, model: PropertiesFromEndpointModel, config: Config, **kwargs: Any ) -> PropertiesFromEndpoint: + parameters = model.retriever.requester.parameters or {} + parameters.update(model.parameters or {}) + model.retriever.requester.parameters = parameters retriever = self._create_component_from_model( model=model.retriever, config=config, @@ -2808,8 +2811,12 @@ def create_query_properties( if isinstance(model.property_list, list): property_list = model.property_list else: + property_list_model = model.property_list + parameters = property_list_model.parameters if property_list_model.parameters is not None else {} + parameters.update(model.parameters or {}) + property_list_model.parameters = parameters property_list = self._create_component_from_model( - model=model.property_list, config=config, **kwargs + model=property_list_model, config=config, **kwargs ) property_chunking = ( @@ -3046,8 +3053,12 @@ def _get_url() -> str: ) if len(query_properties_definitions) == 1: + query_properties_definition_model = query_properties_definitions[0] + parameters = query_properties_definition_model.parameters if query_properties_definition_model.parameters is not None else {} + parameters.update(model.parameters or {}) + query_properties_definition_model.parameters = parameters query_properties = self._create_component_from_model( - model=query_properties_definitions[0], config=config + model=query_properties_definition_model, config=config ) elif ( hasattr(model.requester, "fetch_properties_from_endpoint") From a474f5e8410b45736621904d3bf17a6cc61ec46f Mon Sep 17 00:00:00 2001 From: octavia-squidington-iii Date: Tue, 13 May 2025 13:46:06 +0000 Subject: [PATCH 2/7] Auto-fix lint and format issues --- .../declarative/parsers/model_to_component_factory.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index a647812d1..b2bbc11f2 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -2812,7 +2812,9 @@ def create_query_properties( property_list = model.property_list else: property_list_model = model.property_list - parameters = property_list_model.parameters if property_list_model.parameters is not None else {} + parameters = ( + property_list_model.parameters if property_list_model.parameters is not None else {} + ) parameters.update(model.parameters or {}) property_list_model.parameters = parameters property_list = self._create_component_from_model( @@ -3054,7 +3056,11 @@ def _get_url() -> str: if len(query_properties_definitions) == 1: query_properties_definition_model = query_properties_definitions[0] - parameters = query_properties_definition_model.parameters if query_properties_definition_model.parameters is not None else {} + parameters = ( + query_properties_definition_model.parameters + if query_properties_definition_model.parameters is not None + else {} + ) parameters.update(model.parameters or {}) query_properties_definition_model.parameters = parameters query_properties = self._create_component_from_model( From 708ceeb93f34a854f6c87f0e0a57f6b60ed0e8dc Mon Sep 17 00:00:00 2001 From: darynaishchenko Date: Tue, 13 May 2025 17:12:37 +0300 Subject: [PATCH 3/7] mypy fix --- .../declarative/parsers/model_to_component_factory.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index b2bbc11f2..60daa7b49 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -2758,9 +2758,11 @@ def create_parent_stream_config( def create_properties_from_endpoint( self, model: PropertiesFromEndpointModel, config: Config, **kwargs: Any ) -> PropertiesFromEndpoint: - parameters = model.retriever.requester.parameters or {} - parameters.update(model.parameters or {}) - model.retriever.requester.parameters = parameters + # CustomRetriever doesn't have requester parameter + if isinstance(model.retriever, SimpleRetrieverModel): + parameters = model.retriever.requester.parameters or {} + parameters.update(model.parameters or {}) + model.retriever.requester.parameters = parameters retriever = self._create_component_from_model( model=model.retriever, config=config, From f4ca66160c99f9afe07e83a291dfae2f07a992d4 Mon Sep 17 00:00:00 2001 From: darynaishchenko Date: Wed, 14 May 2025 13:34:42 +0300 Subject: [PATCH 4/7] pass parameters in ManifestComponentTransformer --- .../parsers/manifest_component_transformer.py | 58 ++++++++-- .../parsers/model_to_component_factory.py | 23 +--- .../test_manifest_component_transformer.py | 109 ++++++++++++++++++ 3 files changed, 161 insertions(+), 29 deletions(-) diff --git a/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py b/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py index 44f414343..f9adb278f 100644 --- a/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py +++ b/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py @@ -120,14 +120,6 @@ def propagate_types_and_parameters( if found_type: propagated_component["type"] = found_type - # When there is no resolved type, we're not processing a component (likely a regular object) and don't need to propagate parameters - # When the type refers to a json schema, we're not processing a component as well. This check is currently imperfect as there could - # be json_schema are not objects but we believe this is not likely in our case because: - # * records are Mapping so objects hence SchemaLoader root should be an object - # * connection_specification is a Mapping - if "type" not in propagated_component or self._is_json_schema_object(propagated_component): - return propagated_component - # Combines parameters defined at the current level with parameters from parent components. Parameters at the current # level take precedence current_parameters = dict(copy.deepcopy(parent_parameters)) @@ -138,6 +130,29 @@ def propagate_types_and_parameters( else {**current_parameters, **component_parameters} ) + # When processing request parameters which is an object that does not have a type, so $parameters will not be passes to the object. + # But request parameters can have PropertyChunking object that needs to be updated with paranet $parameters. + # When there is a PropertyChunking object _process_property_chunking_property() is called to update PropertyChunking object with $parameters + # and set updated object to propagated_component, then it's returned without propagation. + if "type" not in propagated_component and self._is_property_chunking_component( + propagated_component + ): + propagated_component = self._process_property_chunking_property( + propagated_component, + parent_field_identifier, + current_parameters, + use_parent_parameters, + ) + + # When there is no resolved type, we're not processing a component (likely a regular object) and don't need to propagate parameters + # When the type refers to a json schema, we're not processing a component as well. This check is currently imperfect as there could + # be json_schema are not objects but we believe this is not likely in our case because: + # * records are Mapping so objects hence SchemaLoader root should be an object + # * connection_specification is a Mapping + + if "type" not in propagated_component or self._is_json_schema_object(propagated_component): + return propagated_component + # Parameters should be applied to the current component fields with the existing field taking precedence over parameters if # both exist for parameter_key, parameter_value in current_parameters.items(): @@ -182,3 +197,30 @@ def propagate_types_and_parameters( @staticmethod def _is_json_schema_object(propagated_component: Mapping[str, Any]) -> bool: return propagated_component.get("type") == "object" + + @staticmethod + def _is_property_chunking_component(propagated_component: Mapping[str, Any]) -> bool: + has_property_chunking = False + for k, v in propagated_component.items(): + if isinstance(v, dict) and v.get("type") == "QueryProperties": + has_property_chunking = True + return has_property_chunking + + def _process_property_chunking_property( + self, + propagated_component: Dict[str, Any], + parent_field_identifier: str, + current_parameters: Mapping[str, Any], + use_parent_parameters: Optional[bool] = None, + ) -> Dict[str, Any]: + for k, v in propagated_component.items(): + if isinstance(v, dict) and v.get("type") == "QueryProperties": + property_chunking_with_parameters = self.propagate_types_and_parameters( + parent_field_identifier, + v, + current_parameters, + use_parent_parameters=use_parent_parameters, + ) + propagated_component[k] = property_chunking_with_parameters + + return propagated_component diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 60daa7b49..7ceb19f14 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -2758,11 +2758,6 @@ def create_parent_stream_config( def create_properties_from_endpoint( self, model: PropertiesFromEndpointModel, config: Config, **kwargs: Any ) -> PropertiesFromEndpoint: - # CustomRetriever doesn't have requester parameter - if isinstance(model.retriever, SimpleRetrieverModel): - parameters = model.retriever.requester.parameters or {} - parameters.update(model.parameters or {}) - model.retriever.requester.parameters = parameters retriever = self._create_component_from_model( model=model.retriever, config=config, @@ -2813,14 +2808,8 @@ def create_query_properties( if isinstance(model.property_list, list): property_list = model.property_list else: - property_list_model = model.property_list - parameters = ( - property_list_model.parameters if property_list_model.parameters is not None else {} - ) - parameters.update(model.parameters or {}) - property_list_model.parameters = parameters property_list = self._create_component_from_model( - model=property_list_model, config=config, **kwargs + model=model.property_list, config=config, **kwargs ) property_chunking = ( @@ -3057,16 +3046,8 @@ def _get_url() -> str: ) if len(query_properties_definitions) == 1: - query_properties_definition_model = query_properties_definitions[0] - parameters = ( - query_properties_definition_model.parameters - if query_properties_definition_model.parameters is not None - else {} - ) - parameters.update(model.parameters or {}) - query_properties_definition_model.parameters = parameters query_properties = self._create_component_from_model( - model=query_properties_definition_model, config=config + model=query_properties_definitions[0], config=config ) elif ( hasattr(model.requester, "fetch_properties_from_endpoint") diff --git a/unit_tests/sources/declarative/parsers/test_manifest_component_transformer.py b/unit_tests/sources/declarative/parsers/test_manifest_component_transformer.py index 67bf6b397..2a2dedbc1 100644 --- a/unit_tests/sources/declarative/parsers/test_manifest_component_transformer.py +++ b/unit_tests/sources/declarative/parsers/test_manifest_component_transformer.py @@ -460,3 +460,112 @@ def test_do_not_propagate_parameters_on_json_schema_object(): actual_component = transformer.propagate_types_and_parameters("", component, {}) assert actual_component == expected_component + + +def test_propagate_property_chunking(): + component = { + "type": "DeclarativeStream", + "streams": [ + { + "type": "DeclarativeStream", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://test.com", + "request_parameters": { + "properties": { + "type": "QueryProperties", + "property_list": { + "type": "PropertiesFromEndpoint", + "property_field_path": ["name"], + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://test.com", + "authenticator": { + "$ref": "#/definitions/authenticator" + }, + "path": "/properties/{{ parameters.entity }}/properties", + "http_method": "GET", + "request_headers": {"Content-Type": "application/json"}, + }, + }, + }, + "property_chunking": { + "type": "PropertyChunking", + "property_limit_type": "characters", + "property_limit": 15000, + }, + } + }, + }, + }, + "$parameters": {"entity": "test_entity"}, + } + ], + } + expected_component = { + "streams": [ + { + "$parameters": {"entity": "test_entity"}, + "entity": "test_entity", + "retriever": { + "$parameters": {"entity": "test_entity"}, + "entity": "test_entity", + "requester": { + "$parameters": {"entity": "test_entity"}, + "entity": "test_entity", + "request_parameters": { + "properties": { + "$parameters": {"entity": "test_entity"}, + "entity": "test_entity", + "property_chunking": { + "$parameters": {"entity": "test_entity"}, + "entity": "test_entity", + "property_limit": 15000, + "property_limit_type": "characters", + "type": "PropertyChunking", + }, + "property_list": { + "$parameters": {"entity": "test_entity"}, + "entity": "test_entity", + "property_field_path": ["name"], + "retriever": { + "$parameters": {"entity": "test_entity"}, + "entity": "test_entity", + "requester": { + "$parameters": {"entity": "test_entity"}, + "authenticator": { + "$ref": "#/definitions/authenticator" + }, + "entity": "test_entity", + "http_method": "GET", + "path": "/properties/{{ " + "parameters.entity " + "}}/properties", + "request_headers": {"Content-Type": "application/json"}, + "type": "HttpRequester", + "url_base": "https://test.com", + }, + "type": "SimpleRetriever", + }, + "type": "PropertiesFromEndpoint", + }, + "type": "QueryProperties", + } + }, + "type": "HttpRequester", + "url_base": "https://test.com", + }, + "type": "SimpleRetriever", + }, + "type": "DeclarativeStream", + } + ], + "type": "DeclarativeStream", + } + transformer = ManifestComponentTransformer() + actual_component = transformer.propagate_types_and_parameters("", component, {}) + assert actual_component == expected_component From 69eb0cf93c68d6c4f6fddea5818bde891564354d Mon Sep 17 00:00:00 2001 From: darynaishchenko Date: Wed, 14 May 2025 14:07:49 +0300 Subject: [PATCH 5/7] refactor code --- .../parsers/manifest_component_transformer.py | 65 ++++++++++++------- 1 file changed, 40 insertions(+), 25 deletions(-) diff --git a/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py b/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py index f9adb278f..e63dfee56 100644 --- a/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py +++ b/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py @@ -130,27 +130,39 @@ def propagate_types_and_parameters( else {**current_parameters, **component_parameters} ) - # When processing request parameters which is an object that does not have a type, so $parameters will not be passes to the object. - # But request parameters can have PropertyChunking object that needs to be updated with paranet $parameters. - # When there is a PropertyChunking object _process_property_chunking_property() is called to update PropertyChunking object with $parameters - # and set updated object to propagated_component, then it's returned without propagation. - if "type" not in propagated_component and self._is_property_chunking_component( - propagated_component - ): - propagated_component = self._process_property_chunking_property( - propagated_component, - parent_field_identifier, - current_parameters, - use_parent_parameters, - ) - # When there is no resolved type, we're not processing a component (likely a regular object) and don't need to propagate parameters # When the type refers to a json schema, we're not processing a component as well. This check is currently imperfect as there could # be json_schema are not objects but we believe this is not likely in our case because: # * records are Mapping so objects hence SchemaLoader root should be an object # * connection_specification is a Mapping + if self._is_json_schema_object(propagated_component): + return propagated_component - if "type" not in propagated_component or self._is_json_schema_object(propagated_component): + # For objects that don't have type check if their object fields have nested components which should have parameters in it. + # For example, QueryProperties in requester.request_parameters: + # requester: + # $ref: "#/definitions/base_requester" + # path: /path/to/entity/{{ parameters.entity }} + # request_parameters: + # archived: 'false' + # properties: + # type: QueryProperties + # property_list: + # retriever: + # type: SimpleRetriever + # requester: + # $ref: "#/definitions/base_requester" + # path: /path/to//{{ parameters.entity }}/properties + # .... + # Update propagated_component value with components if needed and return propagated_component. + if "type" not in propagated_component: + if self._has_nested_components(propagated_component): + propagated_component = self._process_nested_components( + propagated_component, + parent_field_identifier, + current_parameters, + use_parent_parameters, + ) return propagated_component # Parameters should be applied to the current component fields with the existing field taking precedence over parameters if @@ -196,17 +208,20 @@ def propagate_types_and_parameters( @staticmethod def _is_json_schema_object(propagated_component: Mapping[str, Any]) -> bool: - return propagated_component.get("type") == "object" + return propagated_component.get("type") == "object" or propagated_component.get("type") == [ + "null", + "object", + ] @staticmethod - def _is_property_chunking_component(propagated_component: Mapping[str, Any]) -> bool: - has_property_chunking = False + def _has_nested_components(propagated_component: Mapping[str, Any]) -> bool: + has_nested_components = False for k, v in propagated_component.items(): - if isinstance(v, dict) and v.get("type") == "QueryProperties": - has_property_chunking = True - return has_property_chunking + if isinstance(v, dict) and v.get("type"): + has_nested_components = True + return has_nested_components - def _process_property_chunking_property( + def _process_nested_components( self, propagated_component: Dict[str, Any], parent_field_identifier: str, @@ -214,13 +229,13 @@ def _process_property_chunking_property( use_parent_parameters: Optional[bool] = None, ) -> Dict[str, Any]: for k, v in propagated_component.items(): - if isinstance(v, dict) and v.get("type") == "QueryProperties": - property_chunking_with_parameters = self.propagate_types_and_parameters( + if isinstance(v, dict) and v.get("type"): + nested_component_with_parameters = self.propagate_types_and_parameters( parent_field_identifier, v, current_parameters, use_parent_parameters=use_parent_parameters, ) - propagated_component[k] = property_chunking_with_parameters + propagated_component[k] = nested_component_with_parameters return propagated_component From d9f0d6fa5767cab9dd47f07fc96d68d50b518ee4 Mon Sep 17 00:00:00 2001 From: darynaishchenko Date: Wed, 14 May 2025 14:26:47 +0300 Subject: [PATCH 6/7] updated doc comment --- .../parsers/manifest_component_transformer.py | 20 +++---------------- 1 file changed, 3 insertions(+), 17 deletions(-) diff --git a/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py b/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py index e63dfee56..519c2a8fe 100644 --- a/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py +++ b/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py @@ -138,23 +138,9 @@ def propagate_types_and_parameters( if self._is_json_schema_object(propagated_component): return propagated_component - # For objects that don't have type check if their object fields have nested components which should have parameters in it. - # For example, QueryProperties in requester.request_parameters: - # requester: - # $ref: "#/definitions/base_requester" - # path: /path/to/entity/{{ parameters.entity }} - # request_parameters: - # archived: 'false' - # properties: - # type: QueryProperties - # property_list: - # retriever: - # type: SimpleRetriever - # requester: - # $ref: "#/definitions/base_requester" - # path: /path/to//{{ parameters.entity }}/properties - # .... - # Update propagated_component value with components if needed and return propagated_component. + # For objects that don't have type check if their object fields have nested components which should have `$parameters` in it. + # For example, QueryProperties in requester.request_parameters, etc. + # Update propagated_component value with nested components with parent `$parameters` if needed and return propagated_component. if "type" not in propagated_component: if self._has_nested_components(propagated_component): propagated_component = self._process_nested_components( From 173675087a363acfe8fb628f33e7232a490bdcd7 Mon Sep 17 00:00:00 2001 From: darynaishchenko Date: Thu, 15 May 2025 11:45:32 +0300 Subject: [PATCH 7/7] refactor code --- .../parsers/manifest_component_transformer.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py b/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py index 519c2a8fe..8b1f45fcd 100644 --- a/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py +++ b/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py @@ -200,12 +200,11 @@ def _is_json_schema_object(propagated_component: Mapping[str, Any]) -> bool: ] @staticmethod - def _has_nested_components(propagated_component: Mapping[str, Any]) -> bool: - has_nested_components = False + def _has_nested_components(propagated_component: Dict[str, Any]) -> bool: for k, v in propagated_component.items(): if isinstance(v, dict) and v.get("type"): - has_nested_components = True - return has_nested_components + return True + return False def _process_nested_components( self, @@ -214,14 +213,14 @@ def _process_nested_components( current_parameters: Mapping[str, Any], use_parent_parameters: Optional[bool] = None, ) -> Dict[str, Any]: - for k, v in propagated_component.items(): - if isinstance(v, dict) and v.get("type"): + for field_name, field_value in propagated_component.items(): + if isinstance(field_value, dict) and field_value.get("type"): nested_component_with_parameters = self.propagate_types_and_parameters( parent_field_identifier, - v, + field_value, current_parameters, use_parent_parameters=use_parent_parameters, ) - propagated_component[k] = nested_component_with_parameters + propagated_component[field_name] = nested_component_with_parameters return propagated_component